diff --git a/Cargo.toml b/Cargo.toml index 4b05b06..e77a1c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [package] -name = "robert" -version = "0.4.0" +name = "bascet" +version = "0.1.0" edition = "2021" +description = "Bascet: a complete solution for single-cell analysis, with focus on microbial analysis" +authors = ["Hadrien GourlĂ©", "Johan Henriksson", "Julian Dicken"] [dependencies] anyhow = "1.0.93" diff --git a/README.md b/README.md index 3794b96..057228b 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,15 @@ Bascet is an advanced command-line tool aimed primarily to be used through the Z TODO: link to Zorn here +## Installation + +_TODO_ + +## Quick Usage + +`bascet` consists of several subcommands: + +- `bascet prepare`: demultiplex and trim fastq files and store reads in .cram, .bed, or .zip files ## Overview -- old -- diff --git a/src/cmd/getraw_cmd.rs b/src/cmd/getraw_cmd.rs deleted file mode 100644 index 23f106d..0000000 --- a/src/cmd/getraw_cmd.rs +++ /dev/null @@ -1,137 +0,0 @@ -use anyhow::Result; -use anyhow::bail; - -use clap::Args; -use std::{ - path::PathBuf, - sync::Arc, - thread, -}; - -pub const DEFAULT_PATH_TEMP: &str = "temp"; -pub const DEFAULT_CHEMISTRY: &str = "atrandi_wgs"; - - -use crate::command::getraw::GetRaw; -use crate::command::getraw::GetRawParams; - -use crate::barcode::AtrandiWGSChemistry; -use crate::barcode::AtrandiRNAseqChemistry; -use crate::barcode::GeneralCombinatorialBarcode; - - -#[derive(Args)] -pub struct GetRawCMD { - // FASTQ for r1 - #[arg(long = "r1", value_parser)] - pub path_forward: PathBuf, - - // FASTQ for r2 - #[arg(long = "r2", value_parser)] - pub path_reverse: PathBuf, - - // Output filename for complete reads - #[arg(short = 'o', long="out-complete", value_parser)] - pub path_output_complete: PathBuf, - - // Output filename for incomplete reads - #[arg(long = "out-incomplete", value_parser)] - pub path_output_incomplete: PathBuf, - - // Optional: chemistry with barcodes to use - #[arg(long = "chemistry", value_parser, default_value = DEFAULT_CHEMISTRY)] - pub chemistry: String, - - // Optional: file with barcodes to use - #[arg(long = "barcodes", value_parser)] - pub path_barcodes: Option, - - - // Temporary file directory. TODO - use system temp directory as default? TEMP variable? - #[arg(short = 't', value_parser, default_value = DEFAULT_PATH_TEMP)] - pub path_tmp: PathBuf, - - //Whether to sort or not - #[arg(long = "no-sort")] - pub no_sort: bool, - - // Optional: How many threads to use. Need better way of specifying? TODO - #[arg(long, value_parser = clap::value_parser!(usize))] - threads_work: Option, -} -impl GetRawCMD { - pub fn try_execute(&mut self) -> Result<()> { - - crate::fileformat::verify_input_fq_file(&self.path_forward)?; - crate::fileformat::verify_input_fq_file(&self.path_reverse)?; - - let threads_work = self.resolve_thread_config()?; - - let params_io = GetRawParams { - - path_tmp: self.path_tmp.clone(), - path_forward: self.path_forward.clone(), - path_reverse: self.path_reverse.clone(), - path_output_complete: self.path_output_complete.clone(), - path_output_incomplete: self.path_output_incomplete.clone(), - //barcode_file: self.barcode_file.clone(), - sort: !self.no_sort, - threads_work: threads_work, - }; - - // Start the debarcoding for specified chemistry - if self.chemistry == "atrandi_wgs" { - let _ = GetRaw::getraw( - Arc::new(params_io), - &mut AtrandiWGSChemistry::new() - ); - } else if self.chemistry == "atrandi_rnaseq" { - let _ = GetRaw::getraw( - Arc::new(params_io), - &mut AtrandiRNAseqChemistry::new() - ); - } else if self.chemistry == "combinatorial" { - if let Some(path_barcodes) = &self.path_barcodes { - let _ = GetRaw::getraw( - Arc::new(params_io), - &mut GeneralCombinatorialBarcode::new(&path_barcodes) - ); - } else { - bail!("Barcode file not specified"); - } - } else if self.chemistry == "10x" { - panic!("not implemented"); - } else if self.chemistry == "parsebio" { - panic!("not implemented"); - - } else { - bail!("Unidentified chemistry"); - } - - log::info!("GetRaw has finished succesfully"); - Ok(()) - } - - - - - - /////// todo keep this or not? - fn resolve_thread_config(&self) -> Result { - let available_threads = thread::available_parallelism() - .map_err(|e| anyhow::anyhow!("Failed to get available threads: {}", e))? - .get(); - - if available_threads < 2 { - println!("Warning: less than two threads reported to be available"); - } - - Ok(available_threads-1) - } - - -} - - - - diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs deleted file mode 100644 index 9d2beda..0000000 --- a/src/cmd/mod.rs +++ /dev/null @@ -1,21 +0,0 @@ -pub mod extract_cmd; -pub mod mapcell_cmd; -pub mod getraw_cmd; -pub mod shardify_cmd; -pub mod featurise_cmd; -pub mod transform_cmd; -pub mod query_cmd; -pub mod bam2fragments_cmd; -pub mod kraken_cmd; - -pub use extract_cmd::ExtractCMD; -pub use mapcell_cmd::MapCellCMD; -pub use getraw_cmd::GetRawCMD; -pub use shardify_cmd::ShardifyCMD; -pub use featurise_cmd::FeaturiseCMD; -pub use transform_cmd::TransformCMD; -pub use query_cmd::QueryCMD; -pub use bam2fragments_cmd::Bam2FragmentsCMD; - -pub use kraken_cmd::KrakenCMD; - diff --git a/src/command/getraw.rs b/src/command/getraw.rs deleted file mode 100644 index cca1e09..0000000 --- a/src/command/getraw.rs +++ /dev/null @@ -1,549 +0,0 @@ -// This file is part of babbles which is released under the MIT license. -// See file LICENSE or go to https://github.com/HadrienG/babbles for full license details. -use log::{debug, info}; -use seq_io::fastq::OwnedRecord; -use std::fs; -use std::fs::File; -use std::path::PathBuf; -use std::process::Command; -use std::process::Stdio; -use std::sync::Arc; -use std::sync::Mutex; -use crossbeam::channel::Sender; -use crossbeam::channel::Receiver; -use std::io::{BufWriter, Write, Read}; - -use seq_io::fastq::Reader as FastqReader; -use seq_io::fastq::Record as FastqRecord; - -use crate::barcode::Chemistry; - -use crate::fileformat::tirp; -use crate::fileformat::shard; -use crate::fileformat::shard::CellID; -use crate::fileformat::shard::ReadPair; - - - - - -pub struct GetRawParams { - pub path_tmp: std::path::PathBuf, - - pub path_forward: std::path::PathBuf, - pub path_reverse: std::path::PathBuf, - pub path_output_complete: std::path::PathBuf, - pub path_output_incomplete: std::path::PathBuf, - - pub sort: bool, - - pub threads_work: usize, - -} - - - - -#[derive(Debug,Clone)] -struct RecordPair { - reverse_record: OwnedRecord, - forward_record: OwnedRecord -} - -type ListReadWithBarcode = Arc>; -type ListRecordPair = Arc>; - - - - - - - -///// loop for a writer thread, sending output to a Writer -pub fn loop_tirp_writer( - rx: &Arc>>, - hist: &mut shard::BarcodeHistogram, - writer: W -) where W:Write { - - let mut writer= BufWriter::new(writer); - - // Write reads - let mut n_written=0; - while let Ok(Some(list_pairs)) = rx.recv() { - for (bam_cell, cell_id) in list_pairs.iter() { - - tirp::write_records_pair_to_tirp( //:: - &mut writer, - &cell_id, - &bam_cell - ); - - hist.inc(&cell_id); - - if n_written%100000 == 0 { - println!("#reads written to outfile: {:?}", n_written); - } - n_written = n_written + 1; - } - } - - //absolutely have to call this before dropping, for bufwriter - _ = writer.flush(); - -} - - - - - -//////////////// Writer to TIRP format, sorting on the fly -fn create_writer_thread( - outfile: &PathBuf, - thread_pool: &threadpool::ThreadPool, - list_hist: &Arc>>, - sort: bool, - tempdir: &PathBuf -) -> anyhow::Result>>> { - - let outfile = outfile.clone(); - - let list_hist = Arc::clone(list_hist); - let tempdir = tempdir.clone(); - //Create input read queue - //Limit how many chunks can be in pipe - let (tx, rx) = crossbeam::channel::bounded::>(100); - let (tx, rx) = (Arc::new(tx), Arc::new(rx)); - - thread_pool.execute(move || { - // Open output file - println!("Creating pre-TIRP output file: {}",outfile.display()); - - let file_output = File::create(outfile).unwrap(); - - let mut hist = shard::BarcodeHistogram::new(); - - if sort { - - //Pipe to sort, then to given file - let mut cmd = Command::new("sort"); - cmd.arg(format!("--temporary-directory={}", tempdir.display())); - - let mut process = cmd - .stdin(Stdio::piped()) - .stdout(Stdio::from(file_output)) - .spawn().expect("failed to start sorter"); - - let mut stdin = process.stdin.as_mut().unwrap(); - - debug!("sorter process ready"); - loop_tirp_writer(&rx, &mut hist, &mut stdin); - - //Wait for process to finish - debug!("Waiting for sorter process to exit"); - let _result = process.wait().unwrap(); - - //TODO how to terminate pipe?? seems to happen now anyway - - } else { - - debug!("starting non-sorted write loop"); - - let mut writer=BufWriter::new(file_output); //TODO put in a buffered writer in loop. no need to do twice - loop_tirp_writer(&rx, &mut hist, &mut writer); - _ = writer.flush(); - - } - - //Keep histogram for final summary - { - let mut list_hist = list_hist.lock().unwrap();//: Mutex> - list_hist.push(hist); - } -// _ = tx_hist.send(Arc::new(hist)); - - }); - Ok(tx) -} - - - - - - - - - - - -pub struct GetRaw {} - -impl GetRaw { - pub fn getraw<'a>( - params_io: Arc, - barcodes: &mut (impl Chemistry+Clone+Send+'static) - ) -> anyhow::Result<()> { - - info!("Running command: getraw"); - println!("Will sort: {}", params_io.sort); - - if false { - crate::utils::check_bgzip().expect("bgzip not found"); - crate::utils::check_tabix().expect("tabix not found"); - println!("Required software is in place"); - } - - - //Make temp dir - _ = fs::create_dir(¶ms_io.path_tmp); - - // Dispatch barcodes (presets + barcodes -> Vec) - //let mut barcodes = AtrandiChemistry::new(); - - // Open fastq files - let mut forward_file = open_fastq(¶ms_io.path_forward).unwrap(); - let mut reverse_file = open_fastq(¶ms_io.path_reverse).unwrap(); - - // Find probable barcode starts and ends - barcodes.prepare(&mut forward_file, &mut reverse_file).expect("Failed to detect barcode setup from reads"); - let mut forward_file = open_fastq(¶ms_io.path_forward).unwrap(); // reopen the file to read from beginning - let mut reverse_file = open_fastq(¶ms_io.path_reverse).unwrap(); // reopen the file to read from beginning - - // Start writer threads - let path_temp_complete_sorted = params_io.path_tmp.join(PathBuf::from("tmp_sorted_complete.bed")); - let path_temp_incomplete_sorted = params_io.path_tmp.join(PathBuf::from("tmp_sorted_incomplete.bed")); - - let list_hist_complete = Arc::new(Mutex::new(Vec::::new())); - let list_hist_incomplete = Arc::new(Mutex::new(Vec::::new())); - - let thread_pool_write = threadpool::ThreadPool::new(2); - let tx_writer_complete = create_writer_thread( - &path_temp_complete_sorted, - &thread_pool_write, - &list_hist_complete, - true, - ¶ms_io.path_tmp). - expect("Failed to get writer threads"); - let tx_writer_incomplete = create_writer_thread( - &path_temp_incomplete_sorted, - &thread_pool_write, - &list_hist_incomplete, - false, - ¶ms_io.path_tmp). - expect("Failed to get writer threads"); - - // Start worker threads. - // Limit how many chunks can be in the air at the same time, as writers must be able to keep up with the reader - let thread_pool_work = threadpool::ThreadPool::new(params_io.threads_work); - let (tx, rx) = crossbeam::channel::bounded::>(100); - let (tx, rx) = (Arc::new(tx), Arc::new(rx)); - for tidx in 0..params_io.threads_work { - let rx = Arc::clone(&rx); - let tx_writer_complete=Arc::clone(&tx_writer_complete); - let tx_writer_incomplete=Arc::clone(&tx_writer_incomplete); - - println!("Starting worker thread {}",tidx); - - let mut barcodes = barcodes.clone(); //This is needed to keep mutating the pattern in this structure - - thread_pool_work.execute(move || { - - while let Ok(Some(list_bam_cell)) = rx.recv() { - let mut pairs_complete: Vec<(ReadPair, CellID)> = Vec::with_capacity(list_bam_cell.len()); - let mut pairs_incomplete: Vec<(ReadPair, CellID)> = Vec::with_capacity(list_bam_cell.len()); - - for bam_cell in list_bam_cell.iter() { - - - let (is_ok, cellid, readpair) = barcodes.detect_barcode_and_trim( - &bam_cell.forward_record.seq(), - &bam_cell.forward_record.qual(), - &bam_cell.reverse_record.seq(), - &bam_cell.reverse_record.qual() - ); - - if is_ok { - pairs_complete.push((readpair, cellid)); - } else { - pairs_incomplete.push((readpair, cellid)); - } - } - - let _ = tx_writer_complete.send(Some(Arc::new(pairs_complete))); - let _ = tx_writer_incomplete.send(Some(Arc::new(pairs_incomplete))); - } - }); - } - - // Read the fastq files, send to worker threads - println!("Starting to read input file"); - read_all_reads( - &mut forward_file, - &mut reverse_file, - &tx - ); - - // Send termination signals to workers, then wait for them to complete - for _ in 0..params_io.threads_work { - let _ = tx.send(None); - } - thread_pool_work.join(); - - // Send termination signals to writers, then wait for them to complete - let _ = tx_writer_complete.send(None); - let _ = tx_writer_incomplete.send(None); - thread_pool_write.join(); - - - //Sort the complete output files and compress the output. - let mut list_inputfiles:Vec = Vec::new(); - list_inputfiles.push(path_temp_complete_sorted.clone()); - catsort_files( - &list_inputfiles, - ¶ms_io.path_output_complete, - params_io.sort - ); - - //// Concatenate also the incomplete reads. Sorting never needed - let mut list_inputfiles:Vec = Vec::new(); - list_inputfiles.push(path_temp_incomplete_sorted.clone()); - catsort_files( - &list_inputfiles, - ¶ms_io.path_output_incomplete, - false - ); - - //// Index the final file with tabix - println!("Indexing final output file"); - tirp::index_tirp(¶ms_io.path_output_complete).expect("Failed to index file"); - - //// Store histogram - println!("Storing histogram for final output file"); - debug!("Collecting histograms"); - sum_and_store_histogram( - &list_hist_complete, - &tirp::get_histogram_path_for_tirp(¶ms_io.path_output_complete) - ); - sum_and_store_histogram( - &list_hist_incomplete, - &tirp::get_histogram_path_for_tirp(¶ms_io.path_output_incomplete) - ); - - //// Remove temp files - debug!("Removing temp files"); - _ = fs::remove_dir_all(¶ms_io.path_tmp); - - info!("done!"); - - Ok(()) - } -} - - - -pub fn sum_and_store_histogram( - list_hist: &Arc>>, - path: &PathBuf -) { - debug!("Collecting histograms"); - - let list_hist = list_hist.lock().unwrap(); - - let mut totalhist = shard::BarcodeHistogram::new(); - for one_hist in list_hist.iter() { - //let one_hist = rx.recv().expect("Could not get one histrogram"); - totalhist.add_histogram(&one_hist); - } - totalhist.write(&path).expect(format!("Failed to write histogram to {:?}", path).as_str()); -} - - - - - - -////////// read the reads, send to next threads -fn read_all_reads( - forward_file: &mut FastqReader>, - reverse_file: &mut FastqReader>, - tx: &Arc>> -){ - let mut num_read = 0; - loop { - - //Read out chunks. By sending in blocks, we can - //1. keep threads asleep until they got enough work to do to motivate waking them up - //2. avoid send operations, which likely aren't for free - let chunk_size = 1000; - - let mut curit = 0; - let mut list_recpair:Vec = Vec::with_capacity(chunk_size); - while curit = record.expect("Error reading record rev"); - let forward_record = forward_file.next().unwrap().expect("Error reading record fwd"); - - let recpair = RecordPair { - reverse_record: reverse_record.to_owned_record(), - forward_record: forward_record.to_owned_record() - }; - list_recpair.push(recpair); - - num_read = num_read + 1; - - if num_read % 100000 == 0 { - println!("read: {:?}", num_read); - } - - } else { - break; - } - curit += 1; - } - - if !list_recpair.is_empty() { - let _ = tx.send(Some(Arc::new(list_recpair))); - } else { - break; - } - } - -} - - - -/// Concatenate or merge sort files, then process them with bgzip -// sort --merge some_sorted.pregascet.0 some_sorted.pregascet.1 ... | bgzip -c /dev/stdin > test.gascet.0.gz -// -// we could also skip bgsort but it should be fast; we need to recompress it later otherwise. but it is a huge gain in ratio and need for temporary space! -// also, output file is ready for use unless merging with other shards is needed -// -// later index: tabix -p bed test.gascet.0.gz -// able to get a cell: tabix out_complete.0.gascet.gz A1_H5_D9_H12 | head -pub fn catsort_files( - list_inputfiles: &Vec, - path_final: &PathBuf, - sort: bool -) { - let use_bgzip = true; - - //Final destination file - let file_final_output = File::create(path_final).unwrap(); - println!("Compressing and writing final output file: {:?} from input files {:?}",path_final, list_inputfiles); - - //Compress on the fly with bgzip, pipe output to a file - let mut process_b = if use_bgzip { - let mut process_b = Command::new("bgzip"); - process_b.arg("-c").arg("/dev/stdin"); - process_b - } else { - // for testing on osx without bgzip - print!("Warning: using gzip for final file. This will not work with tabix later. Not recommended"); - Command::new("gzip") - }; - let process_b = process_b. - stdin(Stdio::piped()). - stdout(Stdio::from(file_final_output)). - spawn() - .expect("Failed to start zip-command"); - - //Sort or concatenate - let mut process_a = if sort { - let mut cmd = Command::new("sort"); - cmd.arg("--merge"); - cmd - } else { - Command::new("cat") - }; - - //Provide all previously written output files to sort/cat - let list_inputfiles:Vec = list_inputfiles.iter().map(|p| p.to_str().expect("failed to convert path to string").to_string()).collect(); - process_a.args(list_inputfiles); - - - //Wait for the process to finish - let out= process_a. - stdout(process_b.stdin.expect("failed to get stdin on bgzip")). - output(). - expect("failed to get result from bgzip"); - println!("{}", String::from_utf8(out.stdout).unwrap()); - - -} - - - - - - - - - - - -pub fn open_fastq(file_handle: &PathBuf) -> anyhow::Result>> { - - let opened_handle = File::open(file_handle). - expect(format!("Could not open fastq file {}", &file_handle.display()).as_str()); - - let (reader,compression) = niffler::get_reader(Box::new(opened_handle)). - expect(format!("Could not open fastq file {}", &file_handle.display()).as_str()); - - debug!( - "Opened file {} with compression {:?}", - &file_handle.display(), - &compression - ); - Ok(FastqReader::new(reader)) -} - - - - - -#[cfg(test)] -mod tests { -/* - use super::*; - - #[test] - fn test_get_boundaries() { - let pool = 1; - let starts: Vec<(u32, usize, usize)> = vec![(2, 10, 20), (1, 30, 40)]; - assert_eq!(get_boundaries(pool, &starts), (30, 40)); - } - */ - - /* - #[test] - fn test_validate_barcode_inputs_and_pools() { - let no_barcodes = vec![]; - let preset: Option = Some(PathBuf::from("data/barcodes/atrandi/barcodes.tsv")); - let bc = read_barcodes(&no_barcodes, &preset); - assert_eq!(bc[0].sequence, b"GTAACCGA".to_vec()); - assert_eq!(bc[0].name, "A1"); - - let pools = get_pools(&bc); - assert_eq!(pools, HashSet::from([1, 2, 3, 4])); - }*/ - - /* - #[test] - fn test_find_probable_barcode_boundaries() { - let reads_file = PathBuf::from("data/test_reads_R2.fastq"); - let reads = io::open_fastq(&reads_file); - - let mut barcodes = vec![io::Barcode { - index: 0, - name: "A1".to_string(), - pool: 1, - sequence: b"GTAACCGA".to_vec(), - pattern: Myers::::new(b"GTAACCGA".to_vec()), - }]; - let pools = get_pools(&barcodes); - let boundaries = barcode::find_probable_barcode_boundaries(reads, &mut barcodes, &pools, 9); - assert_eq!(boundaries, vec![(1, 36, 44)]); - } - */ -} - diff --git a/src/command/mod.rs b/src/command/mod.rs index 4a1de07..ea9a08c 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -1,13 +1,10 @@ - -pub mod getraw; +pub mod bam2fragments; +pub mod featurise; +pub mod kraken; pub mod mapcell; +pub mod query; pub mod shardify; pub mod transform; -pub mod featurise; -pub mod query; -pub mod bam2fragments; -pub mod kraken; - pub use query::Query; pub use query::QueryParams; @@ -18,21 +15,14 @@ pub use featurise::FeaturiseParams; pub use mapcell::MapCell; pub use mapcell::MapCellParams; -pub use getraw::GetRaw; -pub use getraw::GetRawParams; - pub use shardify::Shardify; pub use shardify::ShardifyParams; - pub use transform::TransformFile; pub use transform::TransformFileParams; - pub use bam2fragments::Bam2Fragments; pub use bam2fragments::Bam2FragmentsParams; - - pub use kraken::Kraken; -pub use kraken::KrakenParams; \ No newline at end of file +pub use kraken::KrakenParams; diff --git a/src/fileformat/fastq.rs b/src/fileformat/fastq.rs index f29d0ce..a8a9292 100644 --- a/src/fileformat/fastq.rs +++ b/src/fileformat/fastq.rs @@ -1,95 +1,79 @@ +use bgzip::{write::BGZFMultiThreadWriter, BGZFError, Compression}; +use log::debug; use std::fs::File; -use std::sync::Arc; use std::path::PathBuf; -use bgzip::{write::BGZFMultiThreadWriter, BGZFError, Compression}; - +use std::sync::Arc; -use crate::fileformat::{shard::{CellID, ReadPair}, CellUMI}; +use crate::fileformat::{ + shard::{CellID, ReadPair}, + CellUMI, +}; //use crate::fileformat::DetectedFileformat; use crate::fileformat::ReadPairWriter; use super::ConstructFromPath; //use crate::fileformat::ReadPairReader; +use seq_io::fastq::Reader as FastqReader; +use seq_io::fastq::Record as FastqRecord; - -#[derive(Debug,Clone)] -pub struct BascetFastqWriterFactory { -} +#[derive(Debug, Clone)] +pub struct BascetFastqWriterFactory {} impl BascetFastqWriterFactory { pub fn new() -> BascetFastqWriterFactory { BascetFastqWriterFactory {} - } + } } impl ConstructFromPath for BascetFastqWriterFactory { - fn new_from_path(&self, fname: &PathBuf) -> anyhow::Result { ///////// maybe anyhow prevents spec of reader? + fn new_from_path(&self, fname: &PathBuf) -> anyhow::Result { + ///////// maybe anyhow prevents spec of reader? BascetFastqWriter::new(fname) } } - pub struct BascetFastqWriter { - pub writer: BGZFMultiThreadWriter + pub writer: BGZFMultiThreadWriter, } impl BascetFastqWriter { - - fn new(path: &PathBuf) -> anyhow::Result{ + fn new(path: &PathBuf) -> anyhow::Result { let out_buffer = File::create(&path).expect("Failed to create fastq.gz output file"); let writer = BGZFMultiThreadWriter::new(out_buffer, Compression::default()); - - Ok(BascetFastqWriter { - writer: writer - }) - } + Ok(BascetFastqWriter { writer: writer }) + } } - impl ReadPairWriter for BascetFastqWriter { - - - fn write_reads_for_cell(&mut self, cell_id:&CellID, list_reads: &Arc>) { + fn write_reads_for_cell(&mut self, cell_id: &CellID, list_reads: &Arc>) { let mut read_num = 0; for rp in list_reads.iter() { - write_fastq_read( &mut self.writer, &make_fastq_readname(read_num, &cell_id, &rp.umi, 1), &rp.r1, - &rp.q1 - ).unwrap(); + &rp.q1, + ) + .unwrap(); write_fastq_read( &mut self.writer, &make_fastq_readname(read_num, &cell_id, &rp.umi, 2), &rp.r2, - &rp.q2 - ).unwrap(); + &rp.q2, + ) + .unwrap(); - read_num+=1; + read_num += 1; } } - } - - - - - - - - - - - - ////////// Write one FASTQ read fn write_fastq_read( writer: &mut W, head: &Vec, - seq:&Vec, - qual:&Vec + seq: &Vec, + qual: &Vec, ) -> Result<(), BGZFError> { writer.write_all(b">")?; writer.write_all(head.as_slice())?; @@ -101,21 +85,35 @@ fn write_fastq_read( Ok(()) } - //// Format FASTQ read names fn make_fastq_readname( - read_num: u32, - cell_id: &CellID, - cell_umi: &CellUMI, - illumna_read_index: u32 + read_num: u32, + cell_id: &CellID, + cell_umi: &CellUMI, + illumna_read_index: u32, ) -> Vec { // typical readname from a random illumina library from miseq, @M03699:250:000000000-DT36J:1:1102:5914:5953 1:N:0:GACGAGATTA+ACATTATCCT - let name=format!("BASCET_{}:{}:{} {}", - cell_id, - String::from_utf8(cell_umi.clone()).unwrap(), - read_num, - illumna_read_index); - name.as_bytes().to_vec() //TODO best if we can avoid making a String + let name = format!( + "BASCET_{}:{}:{} {}", + cell_id, + String::from_utf8(cell_umi.clone()).unwrap(), + read_num, + illumna_read_index + ); + name.as_bytes().to_vec() //TODO best if we can avoid making a String } +pub fn open_fastq(file_handle: &PathBuf) -> anyhow::Result>> { + let opened_handle = File::open(file_handle) + .expect(format!("Could not open fastq file {}", &file_handle.display()).as_str()); + + let (reader, compression) = niffler::get_reader(Box::new(opened_handle)) + .expect(format!("Could not open fastq file {}", &file_handle.display()).as_str()); + debug!( + "Opened file {} with compression {:?}", + &file_handle.display(), + &compression + ); + Ok(FastqReader::new(reader)) +} diff --git a/src/lib.rs b/src/lib.rs index 21239ac..fc01075 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,10 @@ +pub mod barcode; pub mod command; -pub mod utils; pub mod fileformat; -pub mod cmd; -pub mod barcode; +pub mod kmer; pub mod mapcell; pub mod mapcell_scripts; -pub mod kmer; +pub mod subcommands; +pub mod utils; -pub mod playground; \ No newline at end of file +pub mod playground; diff --git a/src/main.rs b/src/main.rs index 1fb846a..b24b230 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,10 @@ use std::process::ExitCode; use clap::{Parser, Subcommand}; -use robert::cmd; - +use bascet::subcommands; #[derive(Parser)] -#[command(version, about)] +#[command(version, about)] // reads from Cargo.toml struct Cli { #[command(subcommand)] command: Commands, @@ -14,25 +13,24 @@ struct Cli { #[derive(Subcommand)] enum Commands { - Getraw(cmd::GetRawCMD), - Mapcell(cmd::MapCellCMD), - Extract(cmd::ExtractCMD), - Shardify(cmd::ShardifyCMD), - Transform(cmd::TransformCMD), - Featurise(cmd::FeaturiseCMD), - Query(cmd::QueryCMD), - Bam2fragments(cmd::Bam2FragmentsCMD), - Kraken(cmd::KrakenCMD), + Prepare(subcommands::Prepare), + Mapcell(subcommands::MapCellCMD), + Extract(subcommands::ExtractCMD), + Shardify(subcommands::ShardifyCMD), + Transform(subcommands::TransformCMD), + Featurise(subcommands::FeaturiseCMD), + Query(subcommands::QueryCMD), + Bam2fragments(subcommands::Bam2FragmentsCMD), + Kraken(subcommands::KrakenCMD), } fn main() -> ExitCode { let cli = Cli::parse(); - env_logger::init(); let result = match cli.command { - Commands::Getraw(mut cmd) => cmd.try_execute(), + Commands::Prepare(mut cmd) => cmd.try_execute(), Commands::Mapcell(mut cmd) => cmd.try_execute(), Commands::Extract(mut cmd) => cmd.try_execute(), Commands::Shardify(mut cmd) => cmd.try_execute(), diff --git a/src/cmd/bam2fragments_cmd.rs b/src/subcommands/bam2fragments.rs similarity index 100% rename from src/cmd/bam2fragments_cmd.rs rename to src/subcommands/bam2fragments.rs diff --git a/src/cmd/extract_cmd.rs b/src/subcommands/extract.rs similarity index 100% rename from src/cmd/extract_cmd.rs rename to src/subcommands/extract.rs diff --git a/src/cmd/featurise_cmd.rs b/src/subcommands/featurise.rs similarity index 100% rename from src/cmd/featurise_cmd.rs rename to src/subcommands/featurise.rs diff --git a/src/cmd/kraken_cmd.rs b/src/subcommands/kraken.rs similarity index 100% rename from src/cmd/kraken_cmd.rs rename to src/subcommands/kraken.rs diff --git a/src/cmd/mapcell_cmd.rs b/src/subcommands/mapcell.rs similarity index 100% rename from src/cmd/mapcell_cmd.rs rename to src/subcommands/mapcell.rs diff --git a/src/subcommands/mod.rs b/src/subcommands/mod.rs new file mode 100644 index 0000000..a372e88 --- /dev/null +++ b/src/subcommands/mod.rs @@ -0,0 +1,20 @@ +pub mod bam2fragments; +pub mod extract; +pub mod featurise; +pub mod kraken; +pub mod mapcell; +pub mod prepare; +pub mod query; +pub mod shardify; +pub mod transform; + +pub use bam2fragments::Bam2FragmentsCMD; +pub use extract::ExtractCMD; +pub use featurise::FeaturiseCMD; +pub use mapcell::MapCellCMD; +pub use prepare::Prepare; +pub use query::QueryCMD; +pub use shardify::ShardifyCMD; +pub use transform::TransformCMD; + +pub use kraken::KrakenCMD; diff --git a/src/subcommands/prepare.rs b/src/subcommands/prepare.rs new file mode 100644 index 0000000..2467691 --- /dev/null +++ b/src/subcommands/prepare.rs @@ -0,0 +1,530 @@ +// This file is part of bascet which is released under the MIT license. +// See file LICENSE or go to https://github.com/henriksson-lab/bascet for full license details. +use std::fs::File; +use std::io::{BufWriter, Read, Write}; +use std::path::PathBuf; +use std::process::{Command, Stdio}; +use std::sync::{Arc, Mutex}; +use std::{fs, thread}; + +use anyhow::{bail, Result}; +use clap::Args; +use crossbeam::channel::{Receiver, Sender}; +use log::{debug, info}; + +use seq_io::fastq::OwnedRecord; +use seq_io::fastq::Reader as FastqReader; +use seq_io::fastq::Record as FastqRecord; + +use crate::barcode; +use crate::fileformat::{fastq, shard, tirp}; + +pub const DEFAULT_PATH_TEMP: &str = "temp"; +pub const DEFAULT_CHEMISTRY: &str = "atrandi_wgs"; + +#[derive(Debug, Clone)] +struct RecordPair { + reverse_record: OwnedRecord, + forward_record: OwnedRecord, +} + +type ListReadWithBarcode = Arc>; +type ListRecordPair = Arc>; + +// TODO maybe all the Params struct should be in an enum, and implement a validateparam trait? +pub struct PrepareParams { + pub path_tmp: std::path::PathBuf, + pub path_forward: std::path::PathBuf, + pub path_reverse: std::path::PathBuf, + pub path_output_complete: std::path::PathBuf, + pub path_output_incomplete: std::path::PathBuf, + pub sort: bool, + pub threads_work: usize, +} + +#[derive(Args)] +/// Detect and trim barcodes from raw reads, and convert to cram/bed/zip +pub struct Prepare { + /// forward reads + #[arg(short, long, value_parser)] + pub forward: PathBuf, + /// reverse reads + #[arg(short, long, value_parser)] + pub reverse: PathBuf, + /// Output filename for complete reads + #[arg(short = 'o', long = "out-complete", value_parser)] + pub path_output_complete: PathBuf, + /// Output filename for incomplete reads + #[arg(long = "out-incomplete", value_parser)] + pub path_output_incomplete: PathBuf, + /// Optional: chemistry with barcodes to use + #[arg(long = "chemistry", value_parser, default_value = DEFAULT_CHEMISTRY)] + pub chemistry: String, + /// Optional: file with barcodes to use + #[arg(long = "barcodes", value_parser)] + pub path_barcodes: Option, + /// Temporary file directory + //TODO - use system temp directory as default? TEMP variable? + #[arg(short = 't', value_parser, default_value = DEFAULT_PATH_TEMP)] + pub path_tmp: PathBuf, + ///Whether to sort or not + #[arg(long = "no-sort")] + pub no_sort: bool, + /// Optional: How many threads to use. Need better way of specifying? TODO + #[arg(long, value_parser = clap::value_parser!(usize))] + threads_work: Option, +} +impl Prepare { + pub fn try_execute(&mut self) -> Result<()> { + // validate inputs + // TODO that function should be in fileformat::fastq + crate::fileformat::verify_input_fq_file(&self.forward)?; + crate::fileformat::verify_input_fq_file(&self.reverse)?; + + let threads_work = self.resolve_thread_config()?; + + // TODO barcodes should be in that struct + let params_io = PrepareParams { + path_tmp: self.path_tmp.clone(), + path_forward: self.forward.clone(), + path_reverse: self.reverse.clone(), + path_output_complete: self.path_output_complete.clone(), + path_output_incomplete: self.path_output_incomplete.clone(), + //barcode_file: self.barcode_file.clone(), + sort: !self.no_sort, + threads_work: threads_work, + }; + + // Start the debarcoding for specified chemistry + // TODO call prepare here instead of GetRaw::getraw + if self.chemistry == "atrandi_wgs" { + let _ = self.prepare( + Arc::new(params_io), + &mut barcode::AtrandiWGSChemistry::new(), + ); + } else if self.chemistry == "atrandi_rnaseq" { + let _ = self.prepare( + Arc::new(params_io), + &mut barcode::AtrandiRNAseqChemistry::new(), + ); + } else if self.chemistry == "combinatorial" { + if let Some(path_barcodes) = &self.path_barcodes { + let _ = self.prepare( + Arc::new(params_io), + &mut barcode::GeneralCombinatorialBarcode::new(&path_barcodes), + ); + } else { + bail!("Barcode file not specified"); + } + } else if self.chemistry == "10x" { + panic!("not implemented"); + } else if self.chemistry == "parsebio" { + panic!("not implemented"); + } else { + bail!("Unidentified chemistry"); + } + + log::info!("bascet prepare has finished succesfully"); + Ok(()) + } + + pub fn prepare<'a>( + &mut self, + params_io: Arc, + // TODO barcodes should be validated and part of the params + barcodes: &mut (impl barcode::Chemistry + Clone + Send + 'static), + ) -> anyhow::Result<()> { + info!("Running command: bascet prepare"); + println!("Will sort: {}", params_io.sort); + + if false { + crate::utils::check_bgzip().expect("bgzip not found"); + crate::utils::check_tabix().expect("tabix not found"); + println!("Required software is in place"); + } + + //Make temp dir + _ = fs::create_dir(¶ms_io.path_tmp); + + // Dispatch barcodes (presets + barcodes -> Vec) + //let mut barcodes = AtrandiChemistry::new(); + + // Open fastq files + let mut forward_file = fastq::open_fastq(¶ms_io.path_forward).unwrap(); + let mut reverse_file = fastq::open_fastq(¶ms_io.path_reverse).unwrap(); + + // Find probable barcode starts and ends + barcodes + // TODO rename that function + .prepare(&mut forward_file, &mut reverse_file) + .expect("Failed to detect barcode setup from reads"); + let mut forward_file = fastq::open_fastq(¶ms_io.path_forward).unwrap(); // reopen the file to read from beginning + let mut reverse_file = fastq::open_fastq(¶ms_io.path_reverse).unwrap(); // reopen the file to read from beginning + + // Start writer threads + let path_temp_complete_sorted = params_io + .path_tmp + .join(PathBuf::from("tmp_sorted_complete.bed")); + let path_temp_incomplete_sorted = params_io + .path_tmp + .join(PathBuf::from("tmp_sorted_incomplete.bed")); + + let list_hist_complete = Arc::new(Mutex::new(Vec::::new())); + let list_hist_incomplete = Arc::new(Mutex::new(Vec::::new())); + + let thread_pool_write = threadpool::ThreadPool::new(2); + let tx_writer_complete = create_writer_thread( + &path_temp_complete_sorted, + &thread_pool_write, + &list_hist_complete, + true, + ¶ms_io.path_tmp, + ) + .expect("Failed to get writer threads"); + let tx_writer_incomplete = create_writer_thread( + &path_temp_incomplete_sorted, + &thread_pool_write, + &list_hist_incomplete, + false, + ¶ms_io.path_tmp, + ) + .expect("Failed to get writer threads"); + + // Start worker threads. + // Limit how many chunks can be in the air at the same time, as writers must be able to keep up with the reader + let thread_pool_work = threadpool::ThreadPool::new(params_io.threads_work); + let (tx, rx) = crossbeam::channel::bounded::>(100); + let (tx, rx) = (Arc::new(tx), Arc::new(rx)); + for tidx in 0..params_io.threads_work { + let rx = Arc::clone(&rx); + let tx_writer_complete = Arc::clone(&tx_writer_complete); + let tx_writer_incomplete = Arc::clone(&tx_writer_incomplete); + + println!("Starting worker thread {}", tidx); + + let mut barcodes = barcodes.clone(); //This is needed to keep mutating the pattern in this structure + + thread_pool_work.execute(move || { + while let Ok(Some(list_bam_cell)) = rx.recv() { + let mut pairs_complete: Vec<(shard::ReadPair, shard::CellID)> = + Vec::with_capacity(list_bam_cell.len()); + let mut pairs_incomplete: Vec<(shard::ReadPair, shard::CellID)> = + Vec::with_capacity(list_bam_cell.len()); + + for bam_cell in list_bam_cell.iter() { + let (is_ok, cellid, readpair) = barcodes.detect_barcode_and_trim( + &bam_cell.forward_record.seq(), + &bam_cell.forward_record.qual(), + &bam_cell.reverse_record.seq(), + &bam_cell.reverse_record.qual(), + ); + + if is_ok { + pairs_complete.push((readpair, cellid)); + } else { + pairs_incomplete.push((readpair, cellid)); + } + } + + let _ = tx_writer_complete.send(Some(Arc::new(pairs_complete))); + let _ = tx_writer_incomplete.send(Some(Arc::new(pairs_incomplete))); + } + }); + } + + // Read the fastq files, send to worker threads + println!("Starting to read input file"); + read_all_reads(&mut forward_file, &mut reverse_file, &tx); + + // Send termination signals to workers, then wait for them to complete + for _ in 0..params_io.threads_work { + let _ = tx.send(None); + } + thread_pool_work.join(); + + // Send termination signals to writers, then wait for them to complete + let _ = tx_writer_complete.send(None); + let _ = tx_writer_incomplete.send(None); + thread_pool_write.join(); + + //Sort the complete output files and compress the output. + let mut list_inputfiles: Vec = Vec::new(); + list_inputfiles.push(path_temp_complete_sorted.clone()); + catsort_files( + &list_inputfiles, + ¶ms_io.path_output_complete, + params_io.sort, + ); + + //// Concatenate also the incomplete reads. Sorting never needed + let mut list_inputfiles: Vec = Vec::new(); + list_inputfiles.push(path_temp_incomplete_sorted.clone()); + catsort_files(&list_inputfiles, ¶ms_io.path_output_incomplete, false); + + //// Index the final file with tabix + println!("Indexing final output file"); + tirp::index_tirp(¶ms_io.path_output_complete).expect("Failed to index file"); + + //// Store histogram + println!("Storing histogram for final output file"); + debug!("Collecting histograms"); + sum_and_store_histogram( + &list_hist_complete, + &tirp::get_histogram_path_for_tirp(¶ms_io.path_output_complete), + ); + sum_and_store_histogram( + &list_hist_incomplete, + &tirp::get_histogram_path_for_tirp(¶ms_io.path_output_incomplete), + ); + + //// Remove temp files + debug!("Removing temp files"); + _ = fs::remove_dir_all(¶ms_io.path_tmp); + + info!("done!"); + + Ok(()) + } + + /////// todo keep this or not? + // TODO can we use rayon to declare the number of threads project-wise? + fn resolve_thread_config(&self) -> Result { + let available_threads = thread::available_parallelism() + .map_err(|e| anyhow::anyhow!("Failed to get available threads: {}", e))? + .get(); + + if available_threads < 2 { + println!("Warning: less than two threads reported to be available"); + } + + Ok(available_threads - 1) + } +} + +//////////////// Writer to TIRP format, sorting on the fly +// TODO refactor, move from prepare.rs (true for all functions below) +pub fn create_writer_thread( + outfile: &PathBuf, + thread_pool: &threadpool::ThreadPool, + list_hist: &Arc>>, + sort: bool, + tempdir: &PathBuf, +) -> anyhow::Result>>> { + let outfile = outfile.clone(); + + let list_hist = Arc::clone(list_hist); + let tempdir = tempdir.clone(); + //Create input read queue + //Limit how many chunks can be in pipe + let (tx, rx) = crossbeam::channel::bounded::>(100); + let (tx, rx) = (Arc::new(tx), Arc::new(rx)); + + thread_pool.execute(move || { + // Open output file + println!("Creating pre-TIRP output file: {}", outfile.display()); + + let file_output = File::create(outfile).unwrap(); + + let mut hist = shard::BarcodeHistogram::new(); + + if sort { + //Pipe to sort, then to given file + let mut cmd = Command::new("sort"); + cmd.arg(format!("--temporary-directory={}", tempdir.display())); + + let mut process = cmd + .stdin(Stdio::piped()) + .stdout(Stdio::from(file_output)) + .spawn() + .expect("failed to start sorter"); + + let mut stdin = process.stdin.as_mut().unwrap(); + + debug!("sorter process ready"); + loop_tirp_writer(&rx, &mut hist, &mut stdin); + + //Wait for process to finish + debug!("Waiting for sorter process to exit"); + let _result = process.wait().unwrap(); + + //TODO how to terminate pipe?? seems to happen now anyway + } else { + debug!("starting non-sorted write loop"); + + let mut writer = BufWriter::new(file_output); //TODO put in a buffered writer in loop. no need to do twice + loop_tirp_writer(&rx, &mut hist, &mut writer); + _ = writer.flush(); + } + + //Keep histogram for final summary + { + let mut list_hist = list_hist.lock().unwrap(); //: Mutex> + list_hist.push(hist); + } + // _ = tx_hist.send(Arc::new(hist)); + }); + Ok(tx) +} + +///// loop for a writer thread, sending output to a Writer +pub fn loop_tirp_writer( + rx: &Arc>>, + hist: &mut shard::BarcodeHistogram, + writer: W, +) where + W: Write, +{ + let mut writer = BufWriter::new(writer); + + // Write reads + let mut n_written = 0; + while let Ok(Some(list_pairs)) = rx.recv() { + for (bam_cell, cell_id) in list_pairs.iter() { + tirp::write_records_pair_to_tirp( + //:: + &mut writer, + &cell_id, + &bam_cell, + ); + + hist.inc(&cell_id); + + if n_written % 100000 == 0 { + println!("#reads written to outfile: {:?}", n_written); + } + n_written = n_written + 1; + } + } + + //absolutely have to call this before dropping, for bufwriter + _ = writer.flush(); +} + +////////// read the reads, send to next threads +fn read_all_reads( + forward_file: &mut FastqReader>, + reverse_file: &mut FastqReader>, + tx: &Arc>>, +) { + let mut num_read = 0; + loop { + //Read out chunks. By sending in blocks, we can + //1. keep threads asleep until they got enough work to do to motivate waking them up + //2. avoid send operations, which likely aren't for free + let chunk_size = 1000; + + let mut curit = 0; + let mut list_recpair: Vec = Vec::with_capacity(chunk_size); + while curit < chunk_size { + if let Some(record) = reverse_file.next() { + let reverse_record: seq_io::fastq::RefRecord<'_> = + record.expect("Error reading record rev"); + let forward_record = forward_file + .next() + .unwrap() + .expect("Error reading record fwd"); + + let recpair = RecordPair { + reverse_record: reverse_record.to_owned_record(), + forward_record: forward_record.to_owned_record(), + }; + list_recpair.push(recpair); + + num_read = num_read + 1; + + if num_read % 100000 == 0 { + println!("read: {:?}", num_read); + } + } else { + break; + } + curit += 1; + } + + if !list_recpair.is_empty() { + let _ = tx.send(Some(Arc::new(list_recpair))); + } else { + break; + } + } +} + +/// Concatenate or merge sort files, then process them with bgzip +// sort --merge some_sorted.pregascet.0 some_sorted.pregascet.1 ... | bgzip -c /dev/stdin > test.gascet.0.gz +// +// we could also skip bgsort but it should be fast; we need to recompress it later otherwise. but it is a huge gain in ratio and need for temporary space! +// also, output file is ready for use unless merging with other shards is needed +// +// later index: tabix -p bed test.gascet.0.gz +// able to get a cell: tabix out_complete.0.gascet.gz A1_H5_D9_H12 | head +pub fn catsort_files(list_inputfiles: &Vec, path_final: &PathBuf, sort: bool) { + let use_bgzip = true; + + //Final destination file + let file_final_output = File::create(path_final).unwrap(); + println!( + "Compressing and writing final output file: {:?} from input files {:?}", + path_final, list_inputfiles + ); + + //Compress on the fly with bgzip, pipe output to a file + let mut process_b = if use_bgzip { + let mut process_b = Command::new("bgzip"); + process_b.arg("-c").arg("/dev/stdin"); + process_b + } else { + // for testing on osx without bgzip + print!("Warning: using gzip for final file. This will not work with tabix later. Not recommended"); + Command::new("gzip") + }; + let process_b = process_b + .stdin(Stdio::piped()) + .stdout(Stdio::from(file_final_output)) + .spawn() + .expect("Failed to start zip-command"); + + //Sort or concatenate + let mut process_a = if sort { + let mut cmd = Command::new("sort"); + cmd.arg("--merge"); + cmd + } else { + Command::new("cat") + }; + + //Provide all previously written output files to sort/cat + let list_inputfiles: Vec = list_inputfiles + .iter() + .map(|p| { + p.to_str() + .expect("failed to convert path to string") + .to_string() + }) + .collect(); + process_a.args(list_inputfiles); + + //Wait for the process to finish + let out = process_a + .stdout(process_b.stdin.expect("failed to get stdin on bgzip")) + .output() + .expect("failed to get result from bgzip"); + println!("{}", String::from_utf8(out.stdout).unwrap()); +} + +pub fn sum_and_store_histogram( + list_hist: &Arc>>, + path: &PathBuf, +) { + debug!("Collecting histograms"); + + let list_hist = list_hist.lock().unwrap(); + + let mut totalhist = shard::BarcodeHistogram::new(); + for one_hist in list_hist.iter() { + //let one_hist = rx.recv().expect("Could not get one histrogram"); + totalhist.add_histogram(&one_hist); + } + totalhist + .write(&path) + .expect(format!("Failed to write histogram to {:?}", path).as_str()); +} diff --git a/src/cmd/query_cmd.rs b/src/subcommands/query.rs similarity index 100% rename from src/cmd/query_cmd.rs rename to src/subcommands/query.rs diff --git a/src/cmd/shardify_cmd.rs b/src/subcommands/shardify.rs similarity index 100% rename from src/cmd/shardify_cmd.rs rename to src/subcommands/shardify.rs diff --git a/src/cmd/transform_cmd.rs b/src/subcommands/transform.rs similarity index 100% rename from src/cmd/transform_cmd.rs rename to src/subcommands/transform.rs