Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Mar 4, 2025
1 parent b9a5bc0 commit 9e25684
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod splitfields;
mod utils;

pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues};
pub use parser::{count_rows, count_rows_from_slice};
pub use parser::{count_rows, count_rows_from_slice, count_rows_from_slice_par};
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;
Expand Down
55 changes: 41 additions & 14 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::{config, POOL};
use polars_error::feature_gated;
use polars_utils::index::Bounded;
use polars_utils::select::select_unpredictable;
use rayon::prelude::*;

Expand Down Expand Up @@ -43,7 +42,7 @@ pub fn count_rows(
let owned = &mut vec![];
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

count_rows_from_slice(
count_rows_from_slice_par(
reader_bytes,
separator,
quote_char,
Expand All @@ -55,7 +54,7 @@ pub fn count_rows(

/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows_from_slice(
pub fn count_rows_from_slice_par(
mut bytes: &[u8],
separator: u8,
quote_char: Option<u8>,
Expand Down Expand Up @@ -89,27 +88,55 @@ pub fn count_rows_from_slice(
})
.unwrap_or(1);

if n_threads == 1 {
return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, has_header);
}

let file_chunks: Vec<(usize, usize)> =
get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);

let iter = file_chunks.into_par_iter().map(|(start, stop)| {
let local_bytes = &bytes[start..stop];
let row_iterator = SplitLines::new(local_bytes, quote_char, eol_char, comment_prefix);
let bytes = &bytes[start..stop];

if comment_prefix.is_some() {
Ok(row_iterator
.filter(|line| !line.is_empty() && !is_comment_line(line, comment_prefix))
.count())
SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
.filter(|line| !is_comment_line(line, comment_prefix))
.count()
} else {
Ok(row_iterator.count())
CountLines::new(quote_char, eol_char).count(bytes).0
}
});

let count_result: PolarsResult<usize> = POOL.install(|| iter.sum());
let n: usize = POOL.install(|| iter.sum());

Ok(n - (has_header as usize))
}

/// Read the number of rows without parsing columns
pub fn count_rows_from_slice(
mut bytes: &[u8],
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
for _ in 0..bytes.len() {
if bytes[0] != eol_char {
break;
}

match count_result {
Ok(val) => Ok(val - (has_header as usize)),
Err(err) => Err(err),
bytes = &bytes[1..];
}

let n = if comment_prefix.is_some() {
SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
.filter(|line| !is_comment_line(line, comment_prefix))
.count()
} else {
CountLines::new(quote_char, eol_char).count(bytes).0
};

Ok(n - (has_header as usize))
}

/// Skip the utf-8 Byte Order Mark.
Expand Down Expand Up @@ -740,7 +767,7 @@ impl CountLines {
}
}

// Returns count and offset in slice
/// Returns count and offset to split for remainder in slice.
#[cfg(feature = "simd")]
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
let mut total_idx = 0;
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/csv/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use super::parser::next_line_position;
use super::parser::next_line_position_naive;
use super::splitfields::SplitFields;

/// TODO: Remove this in favor of parallel CountLines::analyze_chunk
///
/// (see https://github.com/pola-rs/polars/issues/19078)
pub(crate) fn get_file_chunks(
bytes: &[u8],
n_chunks: usize,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl ScanExec for CsvExec {

let bytes = maybe_decompress_bytes(&memslice, owned)?;

let num_rows = count_rows_from_slice(
let num_rows = polars_io::csv::read::count_rows_from_slice_par(
bytes,
popt.separator,
popt.quote_char,
Expand Down
8 changes: 2 additions & 6 deletions crates/polars-plan/src/plans/functions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync;
use polars_core::error::feature_gated;
#[cfg(any(feature = "parquet", feature = "json"))]
use polars_io::cloud::CloudOptions;
#[cfg(feature = "csv")]
use polars_io::csv::read::{
count_rows as count_rows_csv, count_rows_from_slice as count_rows_csv_from_slice,
};
#[cfg(all(feature = "parquet", feature = "async"))]
use polars_io::parquet::read::ParquetAsyncReader;
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -96,7 +92,7 @@ fn count_all_rows_csv(
sources
.iter()
.map(|source| match source {
ScanSourceRef::Path(path) => count_rows_csv(
ScanSourceRef::Path(path) => polars_io::csv::read::count_rows(
path,
parse_options.separator,
parse_options.quote_char,
Expand All @@ -107,7 +103,7 @@ fn count_all_rows_csv(
_ => {
let memslice = source.to_memslice()?;

count_rows_csv_from_slice(
polars_io::csv::read::count_rows_from_slice_par(
&memslice[..],
parse_options.separator,
parse_options.quote_char,
Expand Down
1 change: 0 additions & 1 deletion crates/polars-stream/src/nodes/io_sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,6 @@ impl MultiScanable for CsvSourceNode {

let num_rows = polars_io::csv::read::count_rows_from_slice(
&mem_slice[..],
parse_options.separator,
parse_options.quote_char,
parse_options.comment_prefix.as_ref(),
parse_options.eol_char,
Expand Down
18 changes: 18 additions & 0 deletions py-polars/tests/unit/io/test_lazy_count_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ def test_count_csv(io_files_path: Path, path: str, n_rows: int) -> None:
assert_frame_equal(lf.collect(), expected)


def test_count_csv_comment_char() -> None:
q = pl.scan_csv(
b"""
a,b
1,2
#
3,4
""",
comment_prefix="#",
)

assert_frame_equal(
q.collect(), pl.DataFrame({"a": [1, None, 3], "b": [2, None, 4]})
)
assert q.select(pl.len()).collect().item() == 3


@pytest.mark.write_disk
def test_commented_csv() -> None:
with NamedTemporaryFile() as csv_a:
Expand Down

0 comments on commit 9e25684

Please sign in to comment.