Skip to content

Commit

Permalink
perf: Move row group decode off async thread for local streaming parq…
Browse files Browse the repository at this point in the history
…uet scan (#19828)
  • Loading branch information
nameexhaustion authored Nov 17, 2024
1 parent 01be57f commit 5210d99
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 71 deletions.
111 changes: 40 additions & 71 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ impl BatchedParquetReader {
self.row_group_offset + n > self.n_row_groups
}

#[cfg(feature = "async")]
pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if self.rows_read as usize == self.slice.0 + self.slice.1 && self.has_returned {
return if self.chunks_fifo.is_empty() {
Expand Down Expand Up @@ -1182,79 +1183,47 @@ impl BatchedParquetReader {
.fetch_row_groups(row_group_range.clone())
.await?;

let mut dfs = match store {
ColumnStore::Local(_) => rg_to_dfs(
&store,
&mut self.rows_read,
row_group_range.start,
row_group_range.end,
self.slice,
&self.metadata,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
self.parallel,
&self.projection,
self.use_statistics,
self.hive_partition_columns.as_deref(),
),
#[cfg(feature = "async")]
ColumnStore::Fetched(b) => {
// This branch we spawn the decoding and decompression of the bytes on a rayon task.
// This will ensure we don't block the async thread.

// Reconstruct as that makes it a 'static.
let store = ColumnStore::Fetched(b);
let (tx, rx) = tokio::sync::oneshot::channel();

// Make everything 'static.
let mut rows_read = self.rows_read;
let row_index = self.row_index.clone();
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
let hive_partition_columns = self.hive_partition_columns.clone();
let slice = self.slice;

let f = move || {
let dfs = rg_to_dfs(
&store,
&mut rows_read,
row_group_range.start,
row_group_range.end,
slice,
&metadata,
&schema,
predicate.as_deref(),
row_index,
parallel,
&projection,
use_statistics,
hive_partition_columns.as_deref(),
);

// Don't unwrap send attempt - async task could be cancelled.
let _ = tx.send((dfs, rows_read));
};
let mut dfs = {
// Spawn the decoding and decompression of the bytes on a rayon task.
// This will ensure we don't block the async thread.

// Make everything 'static.
let mut rows_read = self.rows_read;
let row_index = self.row_index.clone();
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
let hive_partition_columns = self.hive_partition_columns.clone();
let slice = self.slice;

let func = move || {
let dfs = rg_to_dfs(
&store,
&mut rows_read,
row_group_range.start,
row_group_range.end,
slice,
&metadata,
&schema,
predicate.as_deref(),
row_index,
parallel,
&projection,
use_statistics,
hive_partition_columns.as_deref(),
);

// Spawn the task and wait on it asynchronously.
if POOL.current_thread_index().is_some() {
// We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until
// another rayon thread executes it - we would deadlock if all rayon threads did this.
// Safety: The tokio runtime flavor is multi-threaded.
tokio::task::block_in_place(f);
} else {
POOL.spawn(f);
};
dfs.map(|x| (x, rows_read))
};

let (dfs, rows_read) = rx.await.unwrap();
self.rows_read = rows_read;
dfs
},
}?;
let (dfs, rows_read) = crate::pl_async::get_runtime().spawn_rayon(func).await?;

self.rows_read = rows_read;
dfs
};

if let Some(ca) = self.include_file_path.as_mut() {
let mut max_len = 0;
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,34 @@ impl RuntimeManager {
{
self.rt.spawn_blocking(f)
}

/// Run a task on the rayon threadpool. To avoid deadlocks, if the current thread is already a
/// rayon thread, the task is executed on the current thread after tokio's `block_in_place` is
/// used to spawn another thread to poll futures.
pub async fn spawn_rayon<F, O>(&self, func: F) -> O
where
F: FnOnce() -> O + Send + Sync + 'static,
O: Send + Sync + 'static,
{
if POOL.current_thread_index().is_some() {
// We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until
// another rayon thread executes it - we would deadlock if all rayon threads did this.
// Safety: The tokio runtime flavor is multi-threaded.
tokio::task::block_in_place(func)
} else {
let (tx, rx) = tokio::sync::oneshot::channel();

let func = move || {
let out = func();
// Don't unwrap send attempt - async task could be cancelled.
let _ = tx.send(out);
};

POOL.spawn(func);

rx.await.unwrap()
}
}
}

static RUNTIME: Lazy<RuntimeManager> = Lazy::new(RuntimeManager::new);
Expand Down

0 comments on commit 5210d99

Please sign in to comment.