use flate2::read::MultiGzDecoder;
use pyo3::prelude::*;
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::fs::File;
use std::io::{BufRead, BufReader, Error};
use std::iter::Sum;
use std::ops::Add;
use std::path::PathBuf;
#[pyclass(eq, frozen)]
#[derive(Debug, PartialEq)]
pub struct ReadsStats {
#[pyo3(get)]
pub read_count: u64,
#[pyo3(get)]
pub basepair_count: u64,
}
impl Display for ReadsStats {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"ReadsStats(read_count={}, basepair_count={})",
self.read_count, self.basepair_count
)
}
}
impl Add for ReadsStats {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
read_count: self.read_count + other.read_count,
basepair_count: self.basepair_count + other.basepair_count,
}
}
}
impl Sum for ReadsStats {
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
iter.fold(
Self {
read_count: 0,
basepair_count: 0,
},
|a, b| a + b,
)
}
}
#[pymethods]
impl ReadsStats {
#[new]
fn new(read_count: u64, basepair_count: u64) -> Self {
Self {
read_count,
basepair_count,
}
}
fn __repr__(&self) -> String {
format!("{self}")
}
fn __add__(&self, other: &Self) -> Self {
ReadsStats {
read_count: self.read_count + other.read_count,
basepair_count: self.basepair_count + other.basepair_count,
}
}
}
#[pymodule]
fn fastq_stats(module: &Bound<'_, PyModule>) -> PyResult<()> {
module.add_class::<ReadsStats>()?;
module.add_function(wrap_pyfunction!(analyze_file, module)?)?;
module.add_function(wrap_pyfunction!(analyze_files, module)?)?;
Ok(())
}
#[derive(FromPyObject, Clone)]
pub enum PathLike {
String(String),
Path(PathBuf),
}
#[pyfunction]
#[pyo3(signature = (filepaths, *, num_threads=None))]
#[allow(clippy::needless_pass_by_value)] pub fn analyze_files(
filepaths: Vec<PathLike>,
num_threads: Option<usize>,
) -> Result<ReadsStats, Error> {
let task = || {
filepaths
.par_iter()
.map(|filepath| analyze_file(filepath.clone()))
.sum()
};
match num_threads {
None => task(), Some(threads) => {
let pool = ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.expect("Failed to build thread pool");
pool.install(task)
}
}
}
#[pyfunction]
pub fn analyze_file(filepath: PathLike) -> Result<ReadsStats, Error> {
let file = match filepath {
PathLike::String(path) => File::open(path),
PathLike::Path(path) => File::open(path),
}?;
let gz_reader = MultiGzDecoder::new(file);
let buf_gz_reader = BufReader::new(gz_reader);
let mut reads: u64 = 0;
let mut basepairs: u64 = 0;
for (linenum, line) in buf_gz_reader.lines().enumerate() {
let line = line?;
if linenum % 4 != 1 {
continue;
}
reads += 1;
basepairs += line.len() as u64;
}
Ok(ReadsStats {
read_count: reads,
basepair_count: basepairs,
})
}
#[cfg(test)]
mod tests {
use super::*;
use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs;
use std::io::Write;
struct TempFile {
pub wrapped_path: PathBuf,
}
impl Drop for TempFile {
fn drop(&mut self) {
if let Ok(()) = fs::remove_file(&self.wrapped_path) {};
}
}
impl From<PathBuf> for TempFile {
fn from(path: PathBuf) -> Self {
Self { wrapped_path: path }
}
}
impl From<&str> for TempFile {
fn from(path: &str) -> Self {
Self {
wrapped_path: PathBuf::from(path),
}
}
}
#[test]
fn test_single_read_single_gz() {
let f = TempFile::from("testfile1.fastq.gz");
let mut buf = Vec::new();
let mut encoder = GzEncoder::new(&mut buf, Compression::default());
encoder.write_all(b"@seq1\nACTG\n+\nooaa\n").unwrap();
encoder.flush().unwrap();
encoder.finish().unwrap();
fs::write(&f.wrapped_path, buf).unwrap();
let result = analyze_file(PathLike::Path(f.wrapped_path.clone()).clone()).unwrap();
assert_eq!(result.read_count, 1);
assert_eq!(result.basepair_count, 4);
}
#[test]
fn test_multi_read_single_gz() {
let f = TempFile::from("testfile2.fastq.gz");
let mut buf = Vec::new();
let mut encoder = GzEncoder::new(&mut buf, Compression::default());
encoder
.write_all(b"@seq1\nACTG\n+\nooaa\n@seq2\nAAA\n+\naao\n")
.unwrap();
encoder.flush().unwrap();
encoder.finish().unwrap();
fs::write(&f.wrapped_path, buf).unwrap();
let result = analyze_file(PathLike::Path(f.wrapped_path.clone()).clone()).unwrap();
assert_eq!(result.read_count, 2);
assert_eq!(result.basepair_count, 7);
}
#[test]
fn test_multi_read_multi_gz() {
let f = TempFile::from("testfile3.fastq.gz");
let mut buf1 = Vec::new();
let mut encoder = GzEncoder::new(&mut buf1, Compression::default());
encoder.write_all(b"@seq1\nACTG\n+\nooaa\n").unwrap();
encoder.flush().unwrap();
encoder.finish().unwrap();
let mut encoder = GzEncoder::new(&mut buf1, Compression::default());
encoder.write_all(b"@seq2\nACA\n+\nooa\n").unwrap();
encoder.flush().unwrap();
encoder.finish().unwrap();
fs::write(&f.wrapped_path, buf1).unwrap();
let result = analyze_file(PathLike::Path(f.wrapped_path.clone()).clone()).unwrap();
assert_eq!(result.read_count, 2);
assert_eq!(result.basepair_count, 7);
}
}