diff --git a/crates/polars-io/src/csv/read/mod.rs b/crates/polars-io/src/csv/read/mod.rs index 2b8f0de59e6e..a707abafb0e6 100644 --- a/crates/polars-io/src/csv/read/mod.rs +++ b/crates/polars-io/src/csv/read/mod.rs @@ -26,7 +26,7 @@ mod splitfields; mod utils; pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues}; -pub use parser::{count_rows, count_rows_from_slice}; +pub use parser::{count_rows, count_rows_from_slice, count_rows_from_slice_par}; pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader}; pub use reader::CsvReader; pub use schema_inference::infer_file_schema; diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index 8d1bb72dc881..2751cd56e035 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -5,7 +5,6 @@ use num_traits::Pow; use polars_core::prelude::*; use polars_core::{config, POOL}; use polars_error::feature_gated; -use polars_utils::index::Bounded; use polars_utils::select::select_unpredictable; use rayon::prelude::*; @@ -43,7 +42,7 @@ pub fn count_rows( let owned = &mut vec![]; let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?; - count_rows_from_slice( + count_rows_from_slice_par( reader_bytes, separator, quote_char, @@ -55,7 +54,7 @@ pub fn count_rows( /// Read the number of rows without parsing columns /// useful for count(*) queries -pub fn count_rows_from_slice( +pub fn count_rows_from_slice_par( mut bytes: &[u8], separator: u8, quote_char: Option, @@ -89,27 +88,55 @@ pub fn count_rows_from_slice( }) .unwrap_or(1); + if n_threads == 1 { + return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header); + } + let file_chunks: Vec<(usize, usize)> = get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char); let iter = file_chunks.into_par_iter().map(|(start, stop)| { - let local_bytes = &bytes[start..stop]; - let row_iterator = SplitLines::new(local_bytes, quote_char, eol_char, comment_prefix); + let bytes = &bytes[start..stop]; + if comment_prefix.is_some() { - Ok(row_iterator - .filter(|line| !line.is_empty() && !is_comment_line(line, comment_prefix)) - .count()) + SplitLines::new(bytes, quote_char, eol_char, comment_prefix) + .filter(|line| !is_comment_line(line, comment_prefix)) + .count() } else { - Ok(row_iterator.count()) + CountLines::new(quote_char, eol_char).count(bytes).0 } }); - let count_result: PolarsResult = POOL.install(|| iter.sum()); + let n: usize = POOL.install(|| iter.sum()); + + Ok(n - (has_header as usize)) +} + +/// Read the number of rows without parsing columns +pub fn count_rows_from_slice( + mut bytes: &[u8], + quote_char: Option, + comment_prefix: Option<&CommentPrefix>, + eol_char: u8, + has_header: bool, +) -> PolarsResult { + for _ in 0..bytes.len() { + if bytes[0] != eol_char { + break; + } - match count_result { - Ok(val) => Ok(val - (has_header as usize)), - Err(err) => Err(err), + bytes = &bytes[1..]; } + + let n = if comment_prefix.is_some() { + SplitLines::new(bytes, quote_char, eol_char, comment_prefix) + .filter(|line| !is_comment_line(line, comment_prefix)) + .count() + } else { + CountLines::new(quote_char, eol_char).count(bytes).0 + }; + + Ok(n - (has_header as usize)) } /// Skip the utf-8 Byte Order Mark. @@ -740,7 +767,7 @@ impl CountLines { } } - // Returns count and offset in slice + /// Returns count and offset to split for remainder in slice. #[cfg(feature = "simd")] pub fn count(&self, bytes: &[u8]) -> (usize, usize) { let mut total_idx = 0; diff --git a/crates/polars-io/src/csv/read/utils.rs b/crates/polars-io/src/csv/read/utils.rs index fa46fb765756..f377557f5142 100644 --- a/crates/polars-io/src/csv/read/utils.rs +++ b/crates/polars-io/src/csv/read/utils.rs @@ -7,6 +7,9 @@ use super::parser::next_line_position; use super::parser::next_line_position_naive; use super::splitfields::SplitFields; +/// TODO: Remove this in favor of parallel CountLines::analyze_chunk +/// +/// (see https://github.com/pola-rs/polars/issues/19078) pub(crate) fn get_file_chunks( bytes: &[u8], n_chunks: usize, diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 9599a47585e8..789b09c01e1d 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -292,7 +292,7 @@ impl ScanExec for CsvExec { let bytes = maybe_decompress_bytes(&memslice, owned)?; - let num_rows = count_rows_from_slice( + let num_rows = polars_io::csv::read::count_rows_from_slice_par( bytes, popt.separator, popt.quote_char, diff --git a/crates/polars-plan/src/plans/functions/count.rs b/crates/polars-plan/src/plans/functions/count.rs index 14f117e5dfaf..acd3328709d1 100644 --- a/crates/polars-plan/src/plans/functions/count.rs +++ b/crates/polars-plan/src/plans/functions/count.rs @@ -9,10 +9,6 @@ use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync; use polars_core::error::feature_gated; #[cfg(any(feature = "parquet", feature = "json"))] use polars_io::cloud::CloudOptions; -#[cfg(feature = "csv")] -use polars_io::csv::read::{ - count_rows as count_rows_csv, count_rows_from_slice as count_rows_csv_from_slice, -}; #[cfg(all(feature = "parquet", feature = "async"))] use polars_io::parquet::read::ParquetAsyncReader; #[cfg(feature = "parquet")] @@ -96,7 +92,7 @@ fn count_all_rows_csv( sources .iter() .map(|source| match source { - ScanSourceRef::Path(path) => count_rows_csv( + ScanSourceRef::Path(path) => polars_io::csv::read::count_rows( path, parse_options.separator, parse_options.quote_char, @@ -107,7 +103,7 @@ fn count_all_rows_csv( _ => { let memslice = source.to_memslice()?; - count_rows_csv_from_slice( + polars_io::csv::read::count_rows_from_slice_par( &memslice[..], parse_options.separator, parse_options.quote_char, diff --git a/crates/polars-stream/src/nodes/io_sources/csv.rs b/crates/polars-stream/src/nodes/io_sources/csv.rs index d055c0bd3227..4eb1a7b7e32f 100644 --- a/crates/polars-stream/src/nodes/io_sources/csv.rs +++ b/crates/polars-stream/src/nodes/io_sources/csv.rs @@ -620,7 +620,6 @@ impl MultiScanable for CsvSourceNode { let num_rows = polars_io::csv::read::count_rows_from_slice( &mem_slice[..], - parse_options.separator, parse_options.quote_char, parse_options.comment_prefix.as_ref(), parse_options.eol_char, diff --git a/py-polars/tests/unit/io/test_lazy_count_star.py b/py-polars/tests/unit/io/test_lazy_count_star.py index a3723f2ad939..19188340100b 100644 --- a/py-polars/tests/unit/io/test_lazy_count_star.py +++ b/py-polars/tests/unit/io/test_lazy_count_star.py @@ -27,6 +27,24 @@ def test_count_csv(io_files_path: Path, path: str, n_rows: int) -> None: assert_frame_equal(lf.collect(), expected) +def test_count_csv_comment_char() -> None: + q = pl.scan_csv( + b""" +a,b +1,2 + +# +3,4 +""", + comment_prefix="#", + ) + + assert_frame_equal( + q.collect(), pl.DataFrame({"a": [1, None, 3], "b": [2, None, 4]}) + ) + assert q.select(pl.len()).collect().item() == 3 + + @pytest.mark.write_disk def test_commented_csv() -> None: with NamedTemporaryFile() as csv_a: