Skip to content

Commit

Permalink
Merge pull request #5 from pola-rs/main
Browse files Browse the repository at this point in the history
Sync with base
  • Loading branch information
lisphilar authored May 4, 2024
2 parents 4291b22 + 6730a72 commit 51ef3b7
Show file tree
Hide file tree
Showing 56 changed files with 1,311 additions and 421 deletions.
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/binary_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub fn binary_to_dictionary<O: Offset, K: DictionaryKey>(
from: &BinaryArray<O>,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryArray<O>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-arrow/src/compute/cast/binview_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(super) fn binview_to_dictionary<K: DictionaryKey>(
from: &BinaryViewArray,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryViewArray<[u8]>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand All @@ -30,6 +31,7 @@ pub(super) fn utf8view_to_dictionary<K: DictionaryKey>(
from: &Utf8ViewArray,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableBinaryViewArray<str>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ pub fn primitive_to_dictionary<T: NativeType + Eq + Hash, K: DictionaryKey>(
let mut array = MutableDictionaryArray::<K, _>::try_empty(MutablePrimitiveArray::<T>::from(
from.data_type().clone(),
))?;
array.reserve(from.len());
array.try_extend(iter)?;

Ok(array.into())
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub fn utf8_to_dictionary<O: Offset, K: DictionaryKey>(
from: &Utf8Array<O>,
) -> PolarsResult<DictionaryArray<K>> {
let mut array = MutableDictionaryArray::<K, MutableUtf8Array<O>>::new();
array.reserve(from.len());
array.try_extend(from.iter())?;

Ok(array.into())
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
WriteOptions {
write_statistics: self.statistics,
compression: self.compression,
version: Version::V2,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
}
}
Expand Down
113 changes: 8 additions & 105 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
//!
use polars_core::prelude::*;
pub use polars_plan::dsl::functions::*;
use polars_plan::prelude::UnionArgs;
use rayon::prelude::*;

use crate::prelude::*;

pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
inputs: L,
rechunk: bool,
parallel: bool,
from_partitioned_ds: bool,
convert_supertypes: bool,
args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let mut inputs = inputs.as_ref().to_vec();

Expand All @@ -24,12 +22,6 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
);

let mut opt_state = lf.opt_state;
let options = UnionOptions {
parallel,
from_partitioned_ds,
rechunk,
..Default::default()
};

let mut lps = Vec::with_capacity(inputs.len());
lps.push(lf.logical_plan);
Expand All @@ -41,11 +33,7 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
lps.push(lp)
}

let lp = DslPlan::Union {
inputs: lps,
options,
convert_supertypes,
};
let lp = DslPlan::Union { inputs: lps, args };
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;
Ok(lf)
Expand All @@ -56,58 +44,10 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
/// Calls [`concat`][concat()] internally.
pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
inputs: L,
args: UnionArgs,
mut args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let lfs = inputs.as_ref();
let schemas = lfs
.iter()
.map(|lf| lf.schema())
.collect::<PolarsResult<Vec<_>>>()?;

let upper_bound_width = schemas.iter().map(|sch| sch.len()).sum();

// Use Vec to preserve order
let mut column_names = Vec::with_capacity(upper_bound_width);
let mut total_schema = Vec::with_capacity(upper_bound_width);

for sch in schemas.iter() {
sch.iter().for_each(|(name, dtype)| {
if !column_names.contains(name) {
column_names.push(name.clone());
total_schema.push((name.clone(), dtype.clone()));
}
});
}
let lfs_with_all_columns = lfs
.iter()
// Zip Frames with their Schemas
.zip(schemas)
.filter_map(|(lf, lf_schema)| {
if lf_schema.is_empty() {
// if the frame is empty we discard
return None;
};

let mut lf = lf.clone();
for (name, dtype) in total_schema.iter() {
// If a name from Total Schema is not present - append
if lf_schema.get_field(name).is_none() {
lf = lf.with_column(NULL.lit().cast(dtype.clone()).alias(name))
}
}

// Now, reorder to match schema
let reordered_lf = lf.select(
column_names
.iter()
.map(|col_name| col(col_name))
.collect::<Vec<Expr>>(),
);
Some(Ok(reordered_lf))
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(lfs_with_all_columns, args)
args.diagonal = true;
concat_impl(inputs, args)
}

/// Concat [LazyFrame]s horizontally.
Expand All @@ -125,25 +65,11 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
opt_state.file_caching |= lf.opt_state.file_caching;
}

let mut lps = Vec::with_capacity(lfs.len());
let mut schemas = Vec::with_capacity(lfs.len());

for lf in lfs.iter() {
let mut lf = lf.clone();
let schema = lf.schema()?;
schemas.push(schema);
let lp = std::mem::take(&mut lf.logical_plan);
lps.push(lp);
}

let combined_schema = merge_schemas(&schemas)?;

let options = HConcatOptions {
parallel: args.parallel,
};
let lp = DslPlan::HConcat {
inputs: lps,
schema: Arc::new(combined_schema),
inputs: lfs.iter().map(|lf| lf.logical_plan.clone()).collect(),
options,
};
let mut lf = LazyFrame::from(lp);
Expand All @@ -152,32 +78,9 @@ pub fn concat_lf_horizontal<L: AsRef<[LazyFrame]>>(
Ok(lf)
}

#[derive(Clone, Copy)]
pub struct UnionArgs {
pub parallel: bool,
pub rechunk: bool,
pub to_supertypes: bool,
}

impl Default for UnionArgs {
fn default() -> Self {
Self {
parallel: true,
rechunk: true,
to_supertypes: false,
}
}
}

/// Concat multiple [`LazyFrame`]s vertically.
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
concat_impl(
inputs,
args.rechunk,
args.parallel,
false,
args.to_supertypes,
)
concat_impl(inputs, args)
}

/// Collect all [`LazyFrame`] computations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ use super::*;
pub struct ProjectionSimple {
pub(crate) input: Box<dyn Executor>,
pub(crate) columns: SchemaRef,
pub(crate) duplicate_check: bool,
}

impl ProjectionSimple {
fn execute_impl(&mut self, df: DataFrame, columns: &[SmartString]) -> PolarsResult<DataFrame> {
if self.duplicate_check {
df._select_impl(columns.as_ref())
} else {
df._select_impl_unchecked(columns.as_ref())
}
// No duplicate check as that an invariant of this node.
df._select_impl_unchecked(columns.as_ref())
}
}

Expand Down
12 changes: 2 additions & 10 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,17 +598,9 @@ pub fn create_physical_plan(
.collect::<PolarsResult<_>>()?;
Ok(Box::new(executors::ExternalContext { input, contexts }))
},
SimpleProjection {
input,
columns,
duplicate_check,
} => {
SimpleProjection { input, columns } => {
let input = create_physical_plan(input, lp_arena, expr_arena)?;
let exec = executors::ProjectionSimple {
input,
columns,
duplicate_check,
};
let exec = executors::ProjectionSimple { input, columns };
Ok(Box::new(exec))
},
Invalid => unreachable!(),
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-lazy/src/physical_plan/streaming/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ pub(super) fn streamable_join(args: &JoinArgs) -> bool {
let supported = match args.how {
#[cfg(feature = "cross_join")]
JoinType::Cross => true,
JoinType::Inner | JoinType::Left | JoinType::Outer { .. } => true,
JoinType::Inner | JoinType::Left => {
// no-coalescing not yet supported in streaming
matches!(
args.coalesce,
JoinCoalesce::JoinSpecific | JoinCoalesce::CoalesceColumns
)
},
JoinType::Outer { .. } => true,
_ => false,
};
supported && !args.validation.needs_checks()
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use polars_plan::logical_plan::{
AnonymousScan, AnonymousScanArgs, AnonymousScanOptions, DslPlan, Literal, LiteralValue, Null,
NULL,
};
pub use polars_plan::prelude::UnionArgs;
pub(crate) use polars_plan::prelude::*;
#[cfg(feature = "rolling_window")]
pub use polars_time::{prelude::RollingOptions, Duration};
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ impl LazyFileListReader for LazyCsvReader {

fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
// set to false, as the csv parser has full thread utilization
concat_impl(&lfs, self.rechunk(), false, true, false)
let args = UnionArgs {
rechunk: self.rechunk(),
parallel: false,
to_supertypes: false,
from_partitioned_ds: true,
..Default::default()
};
concat_impl(&lfs, args)
}
}
10 changes: 9 additions & 1 deletion crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::utils::is_cloud_url;
use polars_io::RowIndex;
use polars_plan::prelude::UnionArgs;

use crate::prelude::*;

Expand Down Expand Up @@ -83,7 +84,14 @@ pub trait LazyFileListReader: Clone {
/// This method should not take into consideration [LazyFileListReader::n_rows]
/// nor [LazyFileListReader::row_index].
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
concat_impl(&lfs, self.rechunk(), true, true, false)
let args = UnionArgs {
rechunk: self.rechunk(),
parallel: true,
to_supertypes: false,
from_partitioned_ds: true,
..Default::default()
};
concat_impl(&lfs, args)
}

/// Get the final [LazyFrame].
Expand Down
29 changes: 17 additions & 12 deletions crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub trait DataFrameJoinOps: IntoDf {
let left_df = self.to_df();
args.validation.is_valid_join(&args.how)?;

let should_coalesce = args.coalesce.coalesce(&args.how);

#[cfg(feature = "cross_join")]
if let JoinType::Cross = args.how {
return left_df.cross_join(other, args.suffix.as_deref(), args.slice);
Expand Down Expand Up @@ -202,13 +204,12 @@ pub trait DataFrameJoinOps: IntoDf {
if selected_left.len() == 1 {
let s_left = &selected_left[0];
let s_right = &selected_right[0];
let drop_names: Option<&[&str]> = if should_coalesce { None } else { Some(&[]) };
return match args.how {
JoinType::Inner => {
left_df._inner_join_from_series(other, s_left, s_right, args, _verbose, None)
},
JoinType::Left => {
left_df._left_join_from_series(other, s_left, s_right, args, _verbose, None)
},
JoinType::Inner => left_df
._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
JoinType::Left => left_df
._left_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
JoinType::Outer => left_df._outer_join_from_series(other, s_left, s_right, args),
#[cfg(feature = "semi_anti_join")]
JoinType::Anti => left_df._semi_anti_join_from_series(
Expand Down Expand Up @@ -265,7 +266,12 @@ pub trait DataFrameJoinOps: IntoDf {

let lhs_keys = prepare_keys_multiple(&selected_left, args.join_nulls)?.into_series();
let rhs_keys = prepare_keys_multiple(&selected_right, args.join_nulls)?.into_series();
let names_right = selected_right.iter().map(|s| s.name()).collect::<Vec<_>>();

let drop_names = if should_coalesce {
Some(selected_right.iter().map(|s| s.name()).collect::<Vec<_>>())
} else {
Some(vec![])
};

// Multiple keys.
match args.how {
Expand All @@ -278,16 +284,15 @@ pub trait DataFrameJoinOps: IntoDf {
},
JoinType::Outer => {
let names_left = selected_left.iter().map(|s| s.name()).collect::<Vec<_>>();
let coalesce = args.coalesce;
args.coalesce = JoinCoalesce::KeepColumns;
let suffix = args.suffix.clone();
let out = left_df._outer_join_from_series(other, &lhs_keys, &rhs_keys, args);

if coalesce.coalesce(&JoinType::Outer) {
if should_coalesce {
Ok(_coalesce_outer_join(
out?,
&names_left,
&names_right,
drop_names.as_ref().unwrap(),
suffix.as_deref(),
left_df,
))
Expand All @@ -301,15 +306,15 @@ pub trait DataFrameJoinOps: IntoDf {
&rhs_keys,
args,
_verbose,
Some(&names_right),
drop_names.as_deref(),
),
JoinType::Left => left_df._left_join_from_series(
other,
&lhs_keys,
&rhs_keys,
args,
_verbose,
Some(&names_right),
drop_names.as_deref(),
),
#[cfg(feature = "semi_anti_join")]
JoinType::Anti | JoinType::Semi => self._join_impl(
Expand Down
Loading

0 comments on commit 51ef3b7

Please sign in to comment.