Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Remove short-lived / non-CPU bound task spawns on async executor in new-streaming #18764

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::async_primitives::connector::connector;
use crate::async_primitives::wait_group::{WaitGroup, WaitToken};
use crate::morsel::get_ideal_morsel_size;
use crate::nodes::{MorselSeq, TaskPriority};
use crate::utils::task_handles_ext;

impl ParquetSourceNode {
/// # Panics
Expand All @@ -36,7 +37,7 @@ impl ParquetSourceNode {
// Safety
// * We dropped the receivers on the line above
// * This function is only called once.
morsel_stream_task_handle.await
morsel_stream_task_handle.await.unwrap()
}

pub(super) fn shutdown(&self) -> impl Future<Output = PolarsResult<()>> {
Expand All @@ -61,15 +62,16 @@ impl ParquetSourceNode {
.spawn(Self::shutdown_impl(async_task_data, self.verbose));
}

/// Constructs the task that provides a morsel stream.
/// Constructs the task that distributes morsels across the engine pipelines.
#[allow(clippy::type_complexity)]
pub(super) fn init_raw_morsel_stream(
pub(super) fn init_raw_morsel_distributor(
&mut self,
) -> (
Vec<crate::async_primitives::connector::Receiver<(DataFrame, MorselSeq, WaitToken)>>,
async_executor::AbortOnDropHandle<PolarsResult<()>>,
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
) {
let verbose = self.verbose;
let io_runtime = polars_io::pl_async::get_runtime();

let use_statistics = self.options.use_statistics;

Expand All @@ -79,10 +81,7 @@ impl ParquetSourceNode {
if let Some((_, 0)) = self.file_options.slice {
return (
raw_morsel_receivers,
async_executor::AbortOnDropHandle::new(async_executor::spawn(
TaskPriority::Low,
std::future::ready(Ok(())),
)),
task_handles_ext::AbortOnDropHandle(io_runtime.spawn(std::future::ready(Ok(())))),
);
}

Expand Down Expand Up @@ -126,12 +125,9 @@ impl ParquetSourceNode {
let row_group_decoder = self.init_row_group_decoder();
let row_group_decoder = Arc::new(row_group_decoder);

// Processes row group metadata and spawns I/O tasks to fetch row group data. This is
// currently spawned onto the CPU runtime as it does not directly make any async I/O calls,
// but instead it potentially performs predicate/slice evaluation on metadata. If we observe
// that under heavy CPU load scenarios the I/O throughput drops due to this task not being
// scheduled we can change it to be a high priority task.
let morsel_stream_task_handle = async_executor::spawn(TaskPriority::Low, async move {
// Distributes morsels across pipelines. This does not perform any CPU or I/O bound work -
// it is purely a dispatch loop.
let raw_morsel_distributor_task_handle = io_runtime.spawn(async move {
let slice_range = {
let Ok(slice) = normalized_slice_oneshot_rx.await else {
// If we are here then the producer probably errored.
Expand Down Expand Up @@ -177,7 +173,7 @@ impl ParquetSourceNode {
.into_stream()
.map(|x| async {
match x {
Ok(handle) => handle.await,
Ok(handle) => handle.await.unwrap(),
Err(e) => Err(e),
}
})
Expand Down Expand Up @@ -258,10 +254,10 @@ impl ParquetSourceNode {
metadata_task_handle.await.unwrap()
});

let morsel_stream_task_handle =
async_executor::AbortOnDropHandle::new(morsel_stream_task_handle);
let raw_morsel_distributor_task_handle =
task_handles_ext::AbortOnDropHandle(raw_morsel_distributor_task_handle);

(raw_morsel_receivers, morsel_stream_task_handle)
(raw_morsel_receivers, raw_morsel_distributor_task_handle)
}

/// Creates a `RowGroupDecoder` that turns `RowGroupData` into DataFrames.
Expand Down
9 changes: 5 additions & 4 deletions crates/polars-stream/src/nodes/parquet_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use polars_plan::prelude::FileScanOptions;

use super::compute_node_prelude::*;
use super::{MorselSeq, TaskPriority};
use crate::async_executor::{self};
use crate::async_primitives::wait_group::WaitToken;
use crate::morsel::SourceToken;
use crate::utils::task_handles_ext;

mod init;
mod mem_prefetch_funcs;
Expand All @@ -30,7 +30,7 @@ mod row_group_decode;

type AsyncTaskData = Option<(
Vec<crate::async_primitives::connector::Receiver<(DataFrame, MorselSeq, WaitToken)>>,
async_executor::AbortOnDropHandle<PolarsResult<()>>,
task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,
)>;

#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -157,12 +157,13 @@ impl ComputeNode for ParquetSourceNode {
self.init_projected_arrow_schema();
self.physical_predicate = self.predicate.clone().map(phys_expr_to_io_expr);

let (raw_morsel_receivers, morsel_stream_task_handle) = self.init_raw_morsel_stream();
let (raw_morsel_receivers, raw_morsel_distributor_task_handle) =
self.init_raw_morsel_distributor();

self.async_task_data
.try_lock()
.unwrap()
.replace((raw_morsel_receivers, morsel_stream_task_handle));
.replace((raw_morsel_receivers, raw_morsel_distributor_task_handle));
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use polars_utils::IdxSize;

use super::mem_prefetch_funcs;
use super::row_group_decode::SharedFileState;
use crate::async_executor;
use crate::nodes::TaskPriority;
use crate::utils::task_handles_ext;

/// Represents byte-data that can be transformed into a DataFrame after some computation.
Expand Down Expand Up @@ -81,7 +79,7 @@ impl RowGroupDataFetcher {

pub(super) async fn next(
&mut self,
) -> Option<PolarsResult<async_executor::AbortOnDropHandle<PolarsResult<RowGroupData>>>> {
) -> Option<PolarsResult<task_handles_ext::AbortOnDropHandle<PolarsResult<RowGroupData>>>> {
'main: loop {
for row_group_metadata in self.current_row_groups.by_ref() {
let current_row_offset = self.current_row_offset;
Expand Down Expand Up @@ -164,9 +162,7 @@ impl RowGroupDataFetcher {
let current_path_index = self.current_path_index;
let current_max_row_group_height = self.current_max_row_group_height;

// Push calculation of byte ranges to a task to run in parallel, as it can be
// expensive for very wide tables and projections.
let handle = async_executor::spawn(TaskPriority::Low, async move {
let handle = io_runtime.spawn(async move {
let fetched_bytes = if let DynByteSource::MemSlice(mem_slice) =
current_byte_source.as_ref()
{
Expand Down Expand Up @@ -205,16 +201,9 @@ impl RowGroupDataFetcher {
&row_group_metadata,
columns.as_ref(),
)
.collect::<Arc<[_]>>();
.collect::<Vec<_>>();

let bytes = {
let ranges_2 = ranges.clone();
task_handles_ext::AbortOnDropHandle(io_runtime.spawn(async move {
current_byte_source.get_ranges(ranges_2.as_ref()).await
}))
.await
.unwrap()?
};
let bytes = current_byte_source.get_ranges(ranges.as_ref()).await?;

assert_eq!(bytes.len(), ranges.len());

Expand Down Expand Up @@ -261,7 +250,7 @@ impl RowGroupDataFetcher {
})
});

let handle = async_executor::AbortOnDropHandle::new(handle);
let handle = task_handles_ext::AbortOnDropHandle(handle);
return Some(Ok(handle));
}

Expand Down Expand Up @@ -302,12 +291,12 @@ type RowGroupDataStreamFut = std::pin::Pin<Box<
dyn Future<
Output =
(
Box<RowGroupDataFetcher> ,
Option <
PolarsResult <
async_executor::AbortOnDropHandle <
PolarsResult <
RowGroupData > > > >
Box<RowGroupDataFetcher> ,
Option <
PolarsResult <
task_handles_ext::AbortOnDropHandle <
PolarsResult <
RowGroupData > > > >
)
> + Send
>>;
Expand Down Expand Up @@ -335,7 +324,7 @@ impl RowGroupDataStream {
}

impl futures::stream::Stream for RowGroupDataStream {
type Item = PolarsResult<async_executor::AbortOnDropHandle<PolarsResult<RowGroupData>>>;
type Item = PolarsResult<task_handles_ext::AbortOnDropHandle<PolarsResult<RowGroupData>>>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,17 @@ impl RowGroupDecoder {

let mut out_columns = Vec::with_capacity(out_width);

if self.row_index.is_some() {
// Add a placeholder so that we don't have to shift the entire vec
// later.
out_columns.push(Column::default());
}

let slice_range = row_group_data
.slice
.map(|(offset, len)| offset..offset + len)
.unwrap_or(0..row_group_data.row_group_metadata.num_rows());

assert!(slice_range.end <= row_group_data.row_group_metadata.num_rows());

if let Some(s) = self.materialize_row_index(row_group_data.as_ref(), slice_range.clone())? {
out_columns.push(s);
}

self.decode_all_columns(
&mut out_columns,
&row_group_data,
Expand All @@ -93,10 +91,6 @@ impl RowGroupDecoder {
out_columns.last().unwrap().len()
};

if let Some(s) = self.materialize_row_index(row_group_data.as_ref(), slice_range)? {
out_columns[0] = s;
}

let shared_file_state = row_group_data
.shared_file_state
.get_or_init(|| self.shared_file_state_init_func(&row_group_data))
Expand Down