Skip to content

Commit

Permalink
add streaming impl and use verbose in test
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Oct 16, 2024
1 parent 3cd0c62 commit 06ed71a
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 23 deletions.
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ or set 'streaming'",

pub use options::{ParallelStrategy, ParquetOptions};
use polars_error::{ErrString, PolarsError};
pub use read_impl::{create_sorting_map, try_set_sorted_flag};
#[cfg(feature = "cloud")]
pub use reader::ParquetAsyncReader;
pub use reader::{BatchedParquetReader, ParquetReader};
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn should_copy_sortedness(dtype: &DataType) -> bool {
)
}

fn try_set_sorted_flag(
pub fn try_set_sorted_flag(
series: &mut Series,
col_idx: usize,
sorting_map: &PlHashMap<usize, IsSorted>,
Expand All @@ -90,7 +90,7 @@ fn try_set_sorted_flag(
}
}

fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
let capacity = md.sorting_columns().map_or(0, |s| s.len());
let mut sorting_map = PlHashMap::with_capacity(capacity);

Expand Down
5 changes: 5 additions & 0 deletions crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ impl RowGroupMetadata {
.map(|x| x.iter().map(|&x| &self.columns[x]))
}

/// Fetch all columns under this root name if it exists.
pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> {
self.column_lookup.get(root_name).map(|x| x.as_slice())
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> usize {
self.num_rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::future::Future;
use std::sync::Arc;

use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap};
use polars_core::series::IsSorted;
use polars_core::utils::operation_exceeded_idxsize_msg;
use polars_error::{polars_err, PolarsResult};
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::FileMetadata;
use polars_io::prelude::_internal::read_this_row_group;
use polars_io::prelude::{create_sorting_map, FileMetadata};
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_parquet::read::RowGroupMetadata;
Expand All @@ -27,6 +28,7 @@ pub(super) struct RowGroupData {
pub(super) slice: Option<(usize, usize)>,
pub(super) file_max_row_group_height: usize,
pub(super) row_group_metadata: RowGroupMetadata,
pub(super) sorting_map: PlHashMap<usize, IsSorted>,
pub(super) shared_file_state: Arc<tokio::sync::OnceCell<SharedFileState>>,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl RowGroupDataFetcher {
let current_row_group_idx = self.current_row_group_idx;

let num_rows = row_group_metadata.num_rows();
let sorting_map = create_sorting_map(&row_group_metadata);

self.current_row_offset = current_row_offset.saturating_add(num_rows);
self.current_row_group_idx += 1;
Expand Down Expand Up @@ -246,6 +249,7 @@ impl RowGroupDataFetcher {
slice,
file_max_row_group_height: current_max_row_group_height,
row_group_metadata,
sorting_map,
shared_file_state: current_shared_file_state.clone(),
})
});
Expand Down
35 changes: 27 additions & 8 deletions crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use polars_error::{polars_bail, PolarsResult};
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::_internal::calc_prefilter_cost;
pub use polars_io::prelude::_internal::PrefilterMaskSetting;
use polars_io::prelude::try_set_sorted_flag;
use polars_io::RowIndex;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::ScanSources;
Expand Down Expand Up @@ -367,11 +368,20 @@ fn decode_column(

assert_eq!(array.len(), expected_num_rows);

let series = Series::try_from((arrow_field, array))?;
let mut series = Series::try_from((arrow_field, array))?;

if let Some(col_idxs) = row_group_data
.row_group_metadata
.columns_idxs_under_root_iter(&arrow_field.name)
{
if col_idxs.len() == 1 {
try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map);
}
}

// TODO: Also load in the metadata.

Ok(series.into())
Ok(series.into_column())
}

/// # Safety
Expand Down Expand Up @@ -652,17 +662,26 @@ fn decode_column_prefiltered(
deserialize_filter,
)?;

let column = Series::try_from((arrow_field, array))?.into_column();
let mut series = Series::try_from((arrow_field, array))?;

if let Some(col_idxs) = row_group_data
.row_group_metadata
.columns_idxs_under_root_iter(&arrow_field.name)
{
if col_idxs.len() == 1 {
try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map);
}
}

let column = if !prefilter {
column.filter(mask)?
let series = if !prefilter {
series.filter(mask)?
} else {
column
series
};

assert_eq!(column.len(), expected_num_rows);
assert_eq!(series.len(), expected_num_rows);

Ok(column)
Ok(series.into_column())
}

mod tests {
Expand Down
20 changes: 8 additions & 12 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1993,7 +1993,7 @@ def test_nested_nonnullable_19158() -> None:


@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"])
def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None:
def test_conserve_sortedness(monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy) -> None:
f = io.BytesIO()

df = pl.DataFrame(
Expand All @@ -2019,19 +2019,15 @@ def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None:
)

f.seek(0)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect()

cols = ["a", "b", "c", "d", "a_nosort"]
captured = capfd.readouterr().err

# @NOTE: We don't conserve sortedness for anything except integers at the
# moment.
assert_frame_equal(
df._to_metadata(cols, ["sorted_asc", "sorted_dsc"]),
pl.DataFrame(
{
"column_name": cols,
"sorted_asc": [True, False, False, False, False],
"sorted_dsc": [False, False, True, False, False],
}
),
)
assert captured.count("Parquet conserved SortingColumn for column chunk of") == 2
assert "Parquet conserved SortingColumn for column chunk of 'a' to Ascending" in captured
assert "Parquet conserved SortingColumn for column chunk of 'c' to Descending" in captured

0 comments on commit 06ed71a

Please sign in to comment.