Skip to content

Commit

Permalink
perf: Don't maintain order when maintain_order=False in new streaming…
Browse files Browse the repository at this point in the history
… sinks (#21586)
  • Loading branch information
coastalwhite authored Mar 4, 2025
1 parent 32042c2 commit 51caaed
Show file tree
Hide file tree
Showing 21 changed files with 118 additions and 50 deletions.
2 changes: 0 additions & 2 deletions crates/polars-io/src/csv/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub struct CsvWriterOptions {
pub include_bom: bool,
pub include_header: bool,
pub batch_size: NonZeroUsize,
pub maintain_order: bool,
pub serialize_options: SerializeOptions,
}

Expand All @@ -20,7 +19,6 @@ impl Default for CsvWriterOptions {
include_bom: false,
include_header: true,
batch_size: NonZeroUsize::new(1024).unwrap(),
maintain_order: false,
serialize_options: SerializeOptions::default(),
}
}
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use crate::shared::schema_to_arrow_checked;
pub struct IpcWriterOptions {
/// Data page compression
pub compression: Option<IpcCompression>,
/// maintain the order the data was processed
pub maintain_order: bool,
}

impl IpcWriterOptions {
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ use crate::prelude::*;

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct JsonWriterOptions {
/// maintain the order the data was processed
pub maintain_order: bool,
}
pub struct JsonWriterOptions {}

/// The format to use to write the DataFrame to JSON: `Json` (a JSON array)
/// or `JsonLines` (each row output on a separate line).
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ pub struct ParquetWriteOptions {
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
pub data_page_size: Option<usize>,
/// maintain the order the data was processed
pub maintain_order: bool,
}

/// The compression strategy to use for writing Parquet files.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/output/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl CsvSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl IpcSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl JsonSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
path: &Path,
options: JsonWriterOptions,
_options: JsonWriterOptions,
_schema: &Schema,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<FilesSink> {
Expand All @@ -38,7 +38,7 @@ impl JsonSink {
let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
options.maintain_order,
true,
morsels_per_sink,
)));

Expand Down
14 changes: 13 additions & 1 deletion crates/polars-plan/src/dsl/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,23 @@ pub enum SyncOnCloseType {
}

/// Options that apply to all sinks.
#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SinkOptions {
/// Call sync when closing the file.
pub sync_on_close: SyncOnCloseType,

/// The output file needs to maintain order of the data that comes in.
pub maintain_order: bool,
}

impl Default for SinkOptions {
fn default() -> Self {
Self {
sync_on_close: Default::default(),
maintain_order: true,
}
}
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
1 change: 0 additions & 1 deletion crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ impl PyDataFrame {
statistics: statistics.0,
row_group_size,
data_page_size,
maintain_order: true,
};
write_partitioned_dataset(
&mut self.df,
Expand Down
17 changes: 5 additions & 12 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ impl PyLazyFrame {
#[cfg(all(feature = "streaming", feature = "parquet"))]
#[pyo3(signature = (
target, compression, compression_level, statistics, row_group_size, data_page_size,
maintain_order, cloud_options, credential_provider, retries, sink_options
cloud_options, credential_provider, retries, sink_options
))]
fn sink_parquet(
&self,
Expand All @@ -715,7 +715,6 @@ impl PyLazyFrame {
statistics: Wrap<StatisticsOptions>,
row_group_size: Option<usize>,
data_page_size: Option<usize>,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
Expand All @@ -728,7 +727,6 @@ impl PyLazyFrame {
statistics: statistics.0,
row_group_size,
data_page_size,
maintain_order,
};

let cloud_options = {
Expand Down Expand Up @@ -766,21 +764,19 @@ impl PyLazyFrame {
}

#[cfg(all(feature = "streaming", feature = "ipc"))]
#[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries, sink_options))]
#[pyo3(signature = (target, compression, cloud_options, credential_provider, retries, sink_options))]
fn sink_ipc(
&self,
py: Python,
target: SinkTarget,
compression: Option<Wrap<IpcCompression>>,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let options = IpcWriterOptions {
compression: compression.map(|c| c.0),
maintain_order,
};

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -822,7 +818,7 @@ impl PyLazyFrame {
#[pyo3(signature = (
target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
quote_style, maintain_order, cloud_options, credential_provider, retries, sink_options
quote_style, cloud_options, credential_provider, retries, sink_options
))]
fn sink_csv(
&self,
Expand All @@ -841,7 +837,6 @@ impl PyLazyFrame {
float_precision: Option<usize>,
null_value: Option<String>,
quote_style: Option<Wrap<QuoteStyle>>,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
Expand All @@ -866,7 +861,6 @@ impl PyLazyFrame {
let options = CsvWriterOptions {
include_bom,
include_header,
maintain_order,
batch_size,
serialize_options,
};
Expand Down Expand Up @@ -908,18 +902,17 @@ impl PyLazyFrame {

#[allow(clippy::too_many_arguments)]
#[cfg(all(feature = "streaming", feature = "json"))]
#[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries, sink_options))]
#[pyo3(signature = (target, cloud_options, credential_provider, retries, sink_options))]
fn sink_json(
&self,
py: Python,
target: SinkTarget,
maintain_order: bool,
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let options = JsonWriterOptions { maintain_order };
let options = JsonWriterOptions {};

let cloud_options = {
let cloud_options = parse_cloud_options(
Expand Down
15 changes: 12 additions & 3 deletions crates/polars-python/src/lazyframe/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,25 @@ impl<'py> FromPyObject<'py> for Wrap<SinkOptions> {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;

if parsed.len() != 1 {
if parsed.len() != 2 {
return Err(PyValueError::new_err(
"`sink_options` must be a dictionary with the exactly 1 field.",
"`sink_options` must be a dictionary with the exactly 2 field.",
));
}

let sync_on_close = PyDictMethods::get_item(&parsed, "sync_on_close")?
.ok_or_else(|| PyValueError::new_err("`sink_options` must be `sync_on_close` field"))?;
let sync_on_close = sync_on_close.extract::<Wrap<SyncOnCloseType>>()?.0;

Ok(Wrap(SinkOptions { sync_on_close }))
let maintain_order =
PyDictMethods::get_item(&parsed, "maintain_order")?.ok_or_else(|| {
PyValueError::new_err("`sink_options` must be `maintain_order` field")
})?;
let maintain_order = maintain_order.extract::<bool>()?;

Ok(Wrap(SinkOptions {
sync_on_close,
maintain_order,
}))
}
}
25 changes: 25 additions & 0 deletions crates/polars-stream/src/async_primitives/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,31 @@ impl<T: Ord> Linearizer<T> {
(slf, inserters)
}

pub fn new_with_maintain_order(
num_inserters: usize,
buffer_size: usize,
maintain_order: bool,
) -> (Self, Vec<Inserter<T>>) {
if maintain_order {
return Self::new(num_inserters, buffer_size);
}

let (sender, receiver) = channel(buffer_size * num_inserters);
let receivers = vec![receiver];
let inserters = (0..num_inserters)
.map(|_| Inserter {
sender: sender.clone(),
})
.collect();

let slf = Self {
receivers,
poll_state: PollState::PollAll,
heap: BinaryHeap::with_capacity(1),
};
(slf, inserters)
}

/// Fetch the next ordered item produced by senders.
///
/// This may wait for at each sender to have sent at least one value before the [`Linearizer`]
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ impl SinkNode for CsvSinkNode {
fn is_sink_input_parallel(&self) -> bool {
true
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down Expand Up @@ -74,8 +77,11 @@ impl SinkNode for CsvSinkNode {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl SinkNode for IpcSinkNode {
fn is_sink_input_parallel(&self) -> bool {
false
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ impl SinkNode for NDJsonSinkNode {
fn is_sink_input_parallel(&self) -> bool {
true
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down Expand Up @@ -59,8 +62,11 @@ impl SinkNode for NDJsonSinkNode {
// .. -> Encode task
let rxs = recv_port.parallel();
// Encode tasks -> IO task
let (mut lin_rx, lin_txs) =
Linearizer::<Linearized>::new(num_pipelines, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
let (mut lin_rx, lin_txs) = Linearizer::<Linearized>::new_with_maintain_order(
num_pipelines,
DEFAULT_SINK_LINEARIZER_BUFFER_SIZE,
self.sink_options.maintain_order,
);

// 16MB
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ fn buffer_and_distribute_columns_task(
pub trait SinkNode {
fn name(&self) -> &str;
fn is_sink_input_parallel(&self) -> bool;
fn do_maintain_order(&self) -> bool;

fn spawn_sink(
&mut self,
Expand Down Expand Up @@ -304,7 +305,7 @@ impl ComputeNode for SinkComputeNode {
let sink_input = if self.sink.is_sink_input_parallel() {
SinkInputPort::Parallel(recv.parallel())
} else {
SinkInputPort::Serial(recv.serial())
SinkInputPort::Serial(recv.serial_with_maintain_order(self.sink.do_maintain_order()))
};
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
let (token, outcome) = PhaseOutcome::new_shared_wait(wait_group.token());
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ impl SinkNode for ParquetSinkNode {
fn is_sink_input_parallel(&self) -> bool {
false
}
fn do_maintain_order(&self) -> bool {
self.sink_options.maintain_order
}

fn spawn_sink(
&mut self,
Expand Down
Loading

0 comments on commit 51caaed

Please sign in to comment.