Skip to content

Commit

Permalink
refactor(rust): Avoid downloading full parquet when initializing new …
Browse files Browse the repository at this point in the history
…streaming parquet source (#21580)
  • Loading branch information
nameexhaustion authored Mar 4, 2025
1 parent d56984e commit 8bc706a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
18 changes: 16 additions & 2 deletions crates/polars-stream/src/nodes/io_sources/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::utils::columns_to_projection;
use polars_io::RowIndex;
use polars_plan::dsl::ScanSource;
use polars_plan::dsl::{ScanSource, ScanSourceRef};
use polars_plan::plans::FileInfo;
use polars_plan::prelude::FileScanOptions;
use polars_utils::mmap::MemSlice;
Expand Down Expand Up @@ -497,7 +497,21 @@ impl MultiScanable for IpcSourceNode {
) -> PolarsResult<Self> {
let options = options.clone();

let memslice = source.as_scan_source_ref().to_memslice()?;
// TODO
// * `to_memslice_async_assume_latest` being a non-async function is not ideal.
// * This is also downloading the whole file even if there is a projection
let memslice = {
if let ScanSourceRef::Path(p) = source.as_scan_source_ref() {
polars_io::file_cache::init_entries_from_uri_list(
&[Arc::from(p.to_str().unwrap())],
cloud_options,
)?;
}

source
.as_scan_source_ref()
.to_memslice_async_assume_latest(source.run_async())?
};
let metadata = Arc::new(read_file_metadata(&mut std::io::Cursor::new(
memslice.as_ref(),
))?);
Expand Down
42 changes: 36 additions & 6 deletions crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use polars_io::cloud::CloudOptions;
use polars_io::predicates::ScanIOPredicate;
use polars_io::prelude::{FileMetadata, ParquetOptions};
use polars_io::utils::byte_source::DynByteSourceBuilder;
use polars_io::RowIndex;
use polars_parquet::read::read_metadata;
use polars_io::{pl_async, RowIndex};
use polars_parquet::read::schema::infer_schema_with_options;
use polars_plan::dsl::{ScanSource, ScanSources};
use polars_plan::plans::hive::HivePartitions;
Expand Down Expand Up @@ -278,9 +277,40 @@ impl MultiScanable for ParquetSourceNode {
cloud_options: Option<&CloudOptions>,
row_index: Option<PlSmallStr>,
) -> PolarsResult<Self> {
let source = source.into_sources();
let memslice = source.at(0).to_memslice()?;
let file_metadata = read_metadata(&mut std::io::Cursor::new(memslice.as_ref()))?;
let scan_sources = source.into_sources();

let verbose = config::verbose();

let scan_sources_2 = scan_sources.clone();
let cloud_options_2 = cloud_options.cloned();

// TODO: Use _opt_full_bytes if it is Some(_)
let (metadata_bytes, _opt_full_bytes) = pl_async::get_runtime()
.spawn(async move {
let scan_sources = scan_sources_2;
let cloud_options = cloud_options_2;
let source = scan_sources.at(0);

let byte_source = source
.to_dyn_byte_source(
&if scan_sources.is_cloud_url() || config::force_async() {
DynByteSourceBuilder::ObjectStore
} else {
DynByteSourceBuilder::Mmap
},
cloud_options.as_ref(),
)
.await?;

metadata_utils::read_parquet_metadata_bytes(&byte_source, verbose).await
})
.await
.unwrap()?;

let file_metadata = polars_parquet::parquet::read::deserialize_metadata(
metadata_bytes.as_ref(),
metadata_bytes.len() * 2 + 1024,
)?;

let arrow_schema = infer_schema_with_options(&file_metadata, &None)?;
let arrow_schema = Arc::new(arrow_schema);
Expand All @@ -303,7 +333,7 @@ impl MultiScanable for ParquetSourceNode {
);

Ok(ParquetSourceNode::new(
source,
scan_sources,
file_info,
None,
options,
Expand Down
9 changes: 0 additions & 9 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,6 @@ pub fn lower_ir(
match ScanSource::from_sources(scan_sources) {
Err(s) => scan_sources = s,
Ok(scan_source) => {
#[cfg(feature = "ipc")]
if matches!(scan_type, FileScan::Ipc { .. }) {
// @TODO: All the things the IPC source does not support yet.
if matches!(&scan_source, ScanSource::Path(p) if polars_io::is_cloud_url(p))
{
todo!();
}
}

// Operation ordering:
// * with_row_index() -> slice() -> filter()

Expand Down

0 comments on commit 8bc706a

Please sign in to comment.