Skip to content

Commit

Permalink
feat: Add new PartitionMaxSize sink (#21573)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Mar 4, 2025
1 parent 54e6080 commit fe56f69
Show file tree
Hide file tree
Showing 24 changed files with 889 additions and 161 deletions.
16 changes: 16 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,10 +808,12 @@ impl LazyFrame {
path: &dyn AsRef<Path>,
options: ParquetWriteOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path.as_ref().to_path_buf()),
sink_options,
file_type: FileType::Parquet(options),
cloud_options,
},
Expand All @@ -828,10 +830,12 @@ impl LazyFrame {
path: impl AsRef<Path>,
options: IpcWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path.as_ref().to_path_buf()),
sink_options,
file_type: FileType::Ipc(options),
cloud_options,
},
Expand All @@ -848,10 +852,12 @@ impl LazyFrame {
path: impl AsRef<Path>,
options: CsvWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path.as_ref().to_path_buf()),
sink_options,
file_type: FileType::Csv(options),
cloud_options,
},
Expand All @@ -868,10 +874,12 @@ impl LazyFrame {
path: impl AsRef<Path>,
options: JsonWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::File {
path: Arc::new(path.as_ref().to_path_buf()),
sink_options,
file_type: FileType::Json(options),
cloud_options,
},
Expand All @@ -889,10 +897,12 @@ impl LazyFrame {
variant: PartitionVariant,
options: ParquetWriteOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
sink_options,
variant,
file_type: FileType::Parquet(options),
cloud_options,
Expand All @@ -911,10 +921,12 @@ impl LazyFrame {
variant: PartitionVariant,
options: IpcWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
sink_options,
variant,
file_type: FileType::Ipc(options),
cloud_options,
Expand All @@ -933,10 +945,12 @@ impl LazyFrame {
variant: PartitionVariant,
options: CsvWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
sink_options,
variant,
file_type: FileType::Csv(options),
cloud_options,
Expand All @@ -955,10 +969,12 @@ impl LazyFrame {
variant: PartitionVariant,
options: JsonWriterOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
sink_options: SinkOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::Partition {
path_f_string: Arc::new(path_f_string.as_ref().to_path_buf()),
sink_options,
variant,
file_type: FileType::Json(options),
cloud_options,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ where
#[allow(unused_variables)]
SinkType::File {
path,
sink_options: _,
file_type,
cloud_options,
} => {
Expand Down
23 changes: 23 additions & 0 deletions crates/polars-plan/src/dsl/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,18 +290,41 @@ pub struct AnonymousScanOptions {
pub fmt_str: &'static str,
}

#[derive(Clone, Copy, PartialEq, Eq, Debug, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SyncOnCloseType {
/// Don't call sync on close.
#[default]
None,

/// Sync only the file contents.
Data,
/// Synce the file contents and the metadata.
All,
}

/// Options that apply to all sinks.
#[derive(Clone, PartialEq, Eq, Debug, Default, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SinkOptions {
/// Call sync when closing the file.
pub sync_on_close: SyncOnCloseType,
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
Memory,
File {
path: Arc<PathBuf>,
file_type: FileType,
sink_options: SinkOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
},
Partition {
path_f_string: Arc<PathBuf>,
file_type: FileType,
sink_options: SinkOptions,
variant: PartitionVariant,
cloud_options: Option<polars_io::cloud::CloudOptions>,
},
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-python/src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ mod io;
mod lazy;
mod meta;
mod misc;
#[cfg(feature = "pymethods")]
mod partitioning;
mod random;
mod range;
mod string_cache;
Expand All @@ -21,8 +19,6 @@ pub use io::*;
pub use lazy::*;
pub use meta::*;
pub use misc::*;
#[cfg(feature = "pymethods")]
pub use partitioning::*;
pub use random::*;
pub use range::*;
pub use string_cache::*;
Expand Down
24 changes: 0 additions & 24 deletions crates/polars-python/src/functions/partitioning.rs

This file was deleted.

65 changes: 28 additions & 37 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyDict, PyList};

use super::PyLazyFrame;
use super::{PyLazyFrame, SinkTarget};
use crate::error::PyPolarsErr;
use crate::expr::ToExprs;
use crate::functions::PyPartitioning;
use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
use crate::lazyframe::visit::NodeTraverser;
use crate::prelude::*;
Expand All @@ -38,31 +37,6 @@ fn pyobject_to_first_path_and_scan_sources(
})
}

#[derive(Clone)]
enum SinkTarget {
Path(PathBuf),
Partition(PyPartitioning),
}

impl<'py> FromPyObject<'py> for SinkTarget {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
if let Ok(v) = ob.extract::<PyPartitioning>() {
return Ok(Self::Partition(v));
}

Ok(Self::Path(ob.extract::<PathBuf>()?))
}
}

impl SinkTarget {
fn unformatted_path(&self) -> &Path {
match self {
Self::Path(path) => path.as_path(),
Self::Partition(partition) => partition.path.as_ref().as_path(),
}
}
}

fn post_opt_callback(
lambda: &PyObject,
root: Node,
Expand Down Expand Up @@ -730,7 +704,7 @@ impl PyLazyFrame {
#[cfg(all(feature = "streaming", feature = "parquet"))]
#[pyo3(signature = (
target, compression, compression_level, statistics, row_group_size, data_page_size,
maintain_order, cloud_options, credential_provider, retries
maintain_order, cloud_options, credential_provider, retries, sink_options
))]
fn sink_parquet(
&self,
Expand All @@ -745,6 +719,7 @@ impl PyLazyFrame {
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let compression = parse_parquet_compression(compression, compression_level)?;

Expand Down Expand Up @@ -773,21 +748,25 @@ impl PyLazyFrame {
py.enter_polars(|| {
let ldf = self.ldf.clone();
match target {
SinkTarget::Path(path) => {
ldf.sink_parquet(&path as &dyn AsRef<Path>, options, cloud_options)
},
SinkTarget::Path(path) => ldf.sink_parquet(
&path as &dyn AsRef<Path>,
options,
cloud_options,
sink_options.0,
),
SinkTarget::Partition(partition) => ldf.sink_parquet_partitioned(
partition.path.as_ref(),
partition.variant,
options,
cloud_options,
sink_options.0,
),
}
})
}

#[cfg(all(feature = "streaming", feature = "ipc"))]
#[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries))]
#[pyo3(signature = (target, compression, maintain_order, cloud_options, credential_provider, retries, sink_options))]
fn sink_ipc(
&self,
py: Python,
Expand All @@ -797,6 +776,7 @@ impl PyLazyFrame {
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let options = IpcWriterOptions {
compression: compression.map(|c| c.0),
Expand Down Expand Up @@ -824,12 +804,15 @@ impl PyLazyFrame {
py.enter_polars(|| {
let ldf = self.ldf.clone();
match target {
SinkTarget::Path(path) => ldf.sink_ipc(path, options, cloud_options),
SinkTarget::Path(path) => {
ldf.sink_ipc(path, options, cloud_options, sink_options.0)
},
SinkTarget::Partition(partition) => ldf.sink_ipc_partitioned(
partition.path.as_ref(),
partition.variant,
options,
cloud_options,
sink_options.0,
),
}
})
Expand All @@ -839,7 +822,7 @@ impl PyLazyFrame {
#[pyo3(signature = (
target, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
datetime_format, date_format, time_format, float_scientific, float_precision, null_value,
quote_style, maintain_order, cloud_options, credential_provider, retries
quote_style, maintain_order, cloud_options, credential_provider, retries, sink_options
))]
fn sink_csv(
&self,
Expand All @@ -862,6 +845,7 @@ impl PyLazyFrame {
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
let null_value = null_value.unwrap_or(SerializeOptions::default().null);
Expand Down Expand Up @@ -908,20 +892,23 @@ impl PyLazyFrame {
py.enter_polars(|| {
let ldf = self.ldf.clone();
match target {
SinkTarget::Path(path) => ldf.sink_csv(path, options, cloud_options),
SinkTarget::Path(path) => {
ldf.sink_csv(path, options, cloud_options, sink_options.0)
},
SinkTarget::Partition(partition) => ldf.sink_csv_partitioned(
partition.path.as_ref(),
partition.variant,
options,
cloud_options,
sink_options.0,
),
}
})
}

#[allow(clippy::too_many_arguments)]
#[cfg(all(feature = "streaming", feature = "json"))]
#[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries))]
#[pyo3(signature = (target, maintain_order, cloud_options, credential_provider, retries, sink_options))]
fn sink_json(
&self,
py: Python,
Expand All @@ -930,6 +917,7 @@ impl PyLazyFrame {
cloud_options: Option<Vec<(String, String)>>,
credential_provider: Option<PyObject>,
retries: usize,
sink_options: Wrap<SinkOptions>,
) -> PyResult<()> {
let options = JsonWriterOptions { maintain_order };

Expand All @@ -950,12 +938,15 @@ impl PyLazyFrame {
py.enter_polars(|| {
let ldf = self.ldf.clone();
match target {
SinkTarget::Path(path) => ldf.sink_json(path, options, cloud_options),
SinkTarget::Path(path) => {
ldf.sink_json(path, options, cloud_options, sink_options.0)
},
SinkTarget::Partition(partition) => ldf.sink_json_partitioned(
partition.path.as_ref(),
partition.variant,
options,
cloud_options,
sink_options.0,
),
}
})
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-python/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ mod exitable;
mod general;
#[cfg(feature = "pymethods")]
mod serde;
mod sink;
pub mod visit;
pub mod visitor;

#[cfg(not(target_arch = "wasm32"))]
pub use exitable::PyInProcessQuery;
use polars::prelude::LazyFrame;
use pyo3::pyclass;
pub use sink::{PyPartitioning, SinkTarget};

#[pyclass]
#[repr(transparent)]
Expand Down
Loading

0 comments on commit fe56f69

Please sign in to comment.