Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ assert_cmd = "2.0"
predicates = "3.0"
tempfile = "3.20"
rstest = "0.26"
nix = { version = "0.30", features = ["fs"] }

[features]
# Disable for faster builds.
Expand Down
26 changes: 21 additions & 5 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ struct FilterProcessorConfig {
debug: bool,
}

/// Check if path is a named pipe or process substitution / /dev/fd/*
fn is_special_input_path(path: &str) -> bool {
use std::os::unix::fs::FileTypeExt;
path.starts_with("/dev/fd/")
|| path.starts_with("/proc/self/fd/")
|| std::path::Path::new(path)
.metadata()
.map(|m| m.file_type().is_fifo())
.unwrap_or(false)
}

/// Check input file path(s) exist
fn check_input_paths(config: &FilterConfig) -> Result<()> {
if !config.minimizers_path.exists() {
Expand All @@ -40,16 +51,21 @@ fn check_input_paths(config: &FilterConfig) -> Result<()> {
));
}

// Skip stdin case
if config.input_path != "-" && !std::path::Path::new(config.input_path).exists() {
if config.input_path != "-"
&& !is_special_input_path(config.input_path)
&& !std::path::Path::new(config.input_path).exists()
{
return Err(anyhow::anyhow!(
"Input file does not exist: {}",
config.input_path
));
}

if let Some(input2_path) = config.input2_path {
if input2_path != "-" && !std::path::Path::new(input2_path).exists() {
if input2_path != "-"
&& !is_special_input_path(input2_path)
&& !std::path::Path::new(input2_path).exists()
{
return Err(anyhow::anyhow!(
"Second input file does not exist: {}",
input2_path
Expand All @@ -62,8 +78,8 @@ fn check_input_paths(config: &FilterConfig) -> Result<()> {

/// Check if file metadata len < 5 (catches empty uncompressed files only)
fn is_empty_file(path: &str) -> Result<bool> {
if path == "-" {
return Ok(false); // Can't check stdin
if path == "-" || is_special_input_path(path) {
return Ok(false);
}
let metadata = std::fs::metadata(path)
.map_err(|e| anyhow::anyhow!("Failed to read file metadata {}: {}", path, e))?;
Expand Down
61 changes: 61 additions & 0 deletions tests/filter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use assert_cmd::cargo;
use predicates::prelude::*;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::process::Command as StdCommand;
use tempfile::tempdir;
Expand Down Expand Up @@ -1944,3 +1945,63 @@ fn test_thread_allocation_no_compression() {
.stderr(predicates::str::contains("threads=8"))
.stderr(predicates::str::contains("threads=8(").not());
}

#[test]
#[cfg(unix)]
fn test_filter_with_named_pipe() {
use nix::sys::stat::Mode;
use nix::unistd::mkfifo;

let temp_dir = tempdir().unwrap();
let fasta_path = temp_dir.path().join("ref.fasta");
let bin_path = temp_dir.path().join("ref.bin");
let fifo_path = temp_dir.path().join("input.fifo");
let output_path = temp_dir.path().join("filtered.fastq");

// Create reference and build index
create_test_fasta(&fasta_path);
build_index(&fasta_path, &bin_path);

// Create a named pipe (FIFO)
mkfifo(&fifo_path, Mode::S_IRWXU).expect("Failed to create FIFO");

// Spawn thread to write test data to FIFO (must happen concurrently with read)
let fifo_path_clone = fifo_path.clone();
let writer_thread = std::thread::spawn(move || {
let mut file = File::create(&fifo_path_clone).expect("Failed to open FIFO for writing");
// Write FASTQ data that matches the reference
let fastq_data = "@seq1\nACGTGCATAGCTGCATGCATGCATGCATGCATGCATGCAATGCAACGTGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCATGCA\n+\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n";
file.write_all(fastq_data.as_bytes())
.expect("Failed to write to FIFO");
});

// Run filter with FIFO as input
let output = StdCommand::new(cargo::cargo_bin!("deacon"))
.arg("filter")
.arg("-a")
.arg("1")
.arg("-r")
.arg("0.0")
.arg(&bin_path)
.arg(&fifo_path)
.arg("--output")
.arg(&output_path)
.output()
.expect("Failed to execute command");

writer_thread.join().expect("Writer thread panicked");

assert!(
output.status.success(),
"Filter command failed: {}",
String::from_utf8_lossy(&output.stderr)
);
assert!(output_path.exists(), "Output file wasn't created");

// Verify output contains the filtered sequence
let output_content = fs::read_to_string(&output_path).unwrap();
assert!(
output_content.contains("@seq1"),
"Output should contain the sequence"
);
}