diff --git a/Cargo.lock b/Cargo.lock index 223c291..187cce2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,6 +235,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clap" version = "4.5.53" @@ -380,6 +386,7 @@ dependencies = [ "minreq", "needletail", "niffler", + "nix", "packed-seq", "paraseq", "parking_lot", @@ -800,6 +807,18 @@ dependencies = [ "zstd", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "normalize-line-endings" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 9c228c1..1040732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ assert_cmd = "2.0" predicates = "3.0" tempfile = "3.20" rstest = "0.26" +nix = { version = "0.30", features = ["fs"] } [features] # Disable for faster builds. diff --git a/src/filter.rs b/src/filter.rs index dd5abc9..adf0e21 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -31,6 +31,17 @@ struct FilterProcessorConfig { debug: bool, } +/// Check if path is a named pipe or process substitution / /dev/fd/* +fn is_special_input_path(path: &str) -> bool { + use std::os::unix::fs::FileTypeExt; + path.starts_with("/dev/fd/") + || path.starts_with("/proc/self/fd/") + || std::path::Path::new(path) + .metadata() + .map(|m| m.file_type().is_fifo()) + .unwrap_or(false) +} + /// Check input file path(s) exist fn check_input_paths(config: &FilterConfig) -> Result<()> { if !config.minimizers_path.exists() { @@ -40,8 +51,10 @@ fn check_input_paths(config: &FilterConfig) -> Result<()> { )); } - // Skip stdin case - if config.input_path != "-" && !std::path::Path::new(config.input_path).exists() { + if config.input_path != "-" + && !is_special_input_path(config.input_path) + && !std::path::Path::new(config.input_path).exists() + { return Err(anyhow::anyhow!( "Input file does not exist: {}", config.input_path @@ -49,7 +62,10 @@ fn check_input_paths(config: &FilterConfig) -> Result<()> { } if let Some(input2_path) = config.input2_path { - if input2_path != "-" && !std::path::Path::new(input2_path).exists() { + if input2_path != "-" + && !is_special_input_path(input2_path) + && !std::path::Path::new(input2_path).exists() + { return Err(anyhow::anyhow!( "Second input file does not exist: {}", input2_path @@ -62,8 +78,8 @@ fn check_input_paths(config: &FilterConfig) -> Result<()> { /// Check if file metadata len < 5 (catches empty uncompressed files only) fn is_empty_file(path: &str) -> Result { - if path == "-" { - return Ok(false); // Can't check stdin + if path == "-" || is_special_input_path(path) { + return Ok(false); } let metadata = std::fs::metadata(path) .map_err(|e| anyhow::anyhow!("Failed to read file metadata {}: {}", path, e))?; diff --git a/tests/filter_tests.rs b/tests/filter_tests.rs index 4c82ed7..b7967cf 100644 --- a/tests/filter_tests.rs +++ b/tests/filter_tests.rs @@ -3,6 +3,7 @@ use assert_cmd::cargo; use predicates::prelude::*; use std::fs; use std::fs::File; +use std::io::Write; use std::path::Path; use std::process::Command as StdCommand; use tempfile::tempdir; @@ -1944,3 +1945,63 @@ fn test_thread_allocation_no_compression() { .stderr(predicates::str::contains("threads=8")) .stderr(predicates::str::contains("threads=8(").not()); } + +#[test] +#[cfg(unix)] +fn test_filter_with_named_pipe() { + use nix::sys::stat::Mode; + use nix::unistd::mkfifo; + + let temp_dir = tempdir().unwrap(); + let fasta_path = temp_dir.path().join("ref.fasta"); + let bin_path = temp_dir.path().join("ref.bin"); + let fifo_path = temp_dir.path().join("input.fifo"); + let output_path = temp_dir.path().join("filtered.fastq"); + + // Create reference and build index + create_test_fasta(&fasta_path); + build_index(&fasta_path, &bin_path); + + // Create a named pipe (FIFO) + mkfifo(&fifo_path, Mode::S_IRWXU).expect("Failed to create FIFO"); + + // Spawn thread to write test data to FIFO (must happen concurrently with read) + let fifo_path_clone = fifo_path.clone(); + let writer_thread = std::thread::spawn(move || { + let mut file = File::create(&fifo_path_clone).expect("Failed to open FIFO for writing"); + // Write FASTQ data that matches the reference + let fastq_data = "@seq1\nACGTGCATAGCTGCATGCATGCATGCATGCATGCATGCAATGCAACGTGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCA\n+\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n"; + file.write_all(fastq_data.as_bytes()) + .expect("Failed to write to FIFO"); + }); + + // Run filter with FIFO as input + let output = StdCommand::new(cargo::cargo_bin!("deacon")) + .arg("filter") + .arg("-a") + .arg("1") + .arg("-r") + .arg("0.0") + .arg(&bin_path) + .arg(&fifo_path) + .arg("--output") + .arg(&output_path) + .output() + .expect("Failed to execute command"); + + writer_thread.join().expect("Writer thread panicked"); + + assert!( + output.status.success(), + "Filter command failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + assert!(output_path.exists(), "Output file wasn't created"); + + // Verify output contains the filtered sequence + let output_content = fs::read_to_string(&output_path).unwrap(); + assert!( + output_content.contains("@seq1"), + "Output should contain the sequence" + ); +}