diff --git a/crates/polars-expr/src/hash_keys.rs b/crates/polars-expr/src/hash_keys.rs index 129299db452e..639fa6fa8fdd 100644 --- a/crates/polars-expr/src/hash_keys.rs +++ b/crates/polars-expr/src/hash_keys.rs @@ -143,6 +143,13 @@ impl HashKeys { Self::Single(s) => Self::Single(s.gather(idxs)), } } + + pub fn sketch_cardinality(&self, sketch: &mut CardinalitySketch) { + match self { + HashKeys::RowEncoded(s) => s.sketch_cardinality(sketch), + HashKeys::Single(s) => s.sketch_cardinality(sketch), + } + } } #[derive(Clone, Debug)] @@ -231,6 +238,20 @@ impl RowEncodedKeys { keys, } } + + pub fn sketch_cardinality(&self, sketch: &mut CardinalitySketch) { + if let Some(validity) = self.keys.validity() { + for (h, is_v) in self.hashes.values_iter().zip(validity) { + if is_v { + sketch.insert(*h); + } + } + } else { + for h in self.hashes.values_iter() { + sketch.insert(*h); + } + } + } } /// Single keys. Does not pre-hash for boolean & integer types, only for strings @@ -284,4 +305,8 @@ impl SingleKeys { keys: self.keys.take_slice_unchecked(idxs), } } + + pub fn sketch_cardinality(&self, _sketch: &mut CardinalitySketch) { + todo!() + } } diff --git a/crates/polars-stream/src/nodes/joins/equi_join.rs b/crates/polars-stream/src/nodes/joins/equi_join.rs index be4625c0f796..76c7d37e102f 100644 --- a/crates/polars-stream/src/nodes/joins/equi_join.rs +++ b/crates/polars-stream/src/nodes/joins/equi_join.rs @@ -1,12 +1,14 @@ -use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, LazyLock}; use polars_core::prelude::*; use polars_core::schema::{Schema, SchemaExt}; use polars_core::series::IsSorted; use polars_core::utils::accumulate_dataframes_vertical_unchecked; -use polars_core::POOL; +use polars_core::{config, POOL}; use polars_expr::chunked_idx_table::{new_chunked_idx_table, ChunkedIdxTable}; use polars_expr::hash_keys::HashKeys; +use polars_io::pl_async::get_runtime; use polars_ops::frame::{JoinArgs, JoinType, MaintainOrderJoin}; use polars_ops::prelude::TakeChunked; use polars_ops::series::coalesce_columns; @@ -17,13 +19,23 @@ use polars_utils::pl_str::PlSmallStr; use polars_utils::{format_pl_smallstr, IdxSize}; use rayon::prelude::*; -use crate::async_primitives::connector::{Receiver, Sender}; +use crate::async_primitives::connector::{connector, Receiver, Sender}; use crate::async_primitives::wait_group::WaitGroup; use crate::expression::StreamExpr; use crate::morsel::{get_ideal_morsel_size, SourceToken}; use crate::nodes::compute_node_prelude::*; use crate::nodes::in_memory_source::InMemorySourceNode; +static SAMPLE_LIMIT: LazyLock = LazyLock::new(|| { + std::env::var("POLARS_JOIN_SAMPLE_LIMIT") + .map(|limit| limit.parse().unwrap()) + .unwrap_or(10_000_000) +}); + +// If one side is this much bigger than the other side we'll always use the +// smaller side as the build side without checking cardinalities. +const LOPSIDED_SAMPLE_FACTOR: usize = 10; + /// A payload selector contains for each column whether that column should be /// included in the payload, and if yes with what name. fn compute_payload_selector( @@ -138,6 +150,57 @@ fn select_payload(df: DataFrame, selector: &[Option]) -> DataFrame { .collect() } +#[derive(Default)] +struct SampleState { + left: Vec, + left_len: usize, + right: Vec, + right_len: usize, +} + +impl SampleState { + async fn sink( + mut recv: Receiver, + morsels: &mut Vec, + len: &mut usize, + this_final_len: Arc, + other_final_len: Arc, + ) -> PolarsResult<()> { + while let Ok(mut morsel) = recv.recv().await { + *len += morsel.df().height(); + if *len >= *SAMPLE_LIMIT + || *len + >= other_final_len + .load(Ordering::Relaxed) + .saturating_mul(LOPSIDED_SAMPLE_FACTOR) + { + morsel.source_token().stop(); + } + + drop(morsel.take_consume_token()); + morsels.push(morsel); + } + this_final_len.store(*len, Ordering::Relaxed); + Ok(()) + } +} + +fn estimate_cardinality( + morsels: &[Morsel], + key_selectors: &[StreamExpr], + params: &EquiJoinParams, + state: &ExecutionState, +) -> PolarsResult { + // TODO: parallelize. + let mut sketch = CardinalitySketch::new(); + for morsel in morsels { + let hash_keys = + get_runtime().block_on(select_keys(morsel.df(), key_selectors, params, state))?; + hash_keys.sketch_cardinality(&mut sketch); + } + Ok(sketch.estimate()) +} + #[derive(Default)] struct BuildPartition { hash_keys: Vec, @@ -145,8 +208,10 @@ struct BuildPartition { sketch: Option, } +#[derive(Default)] struct BuildState { partitions_per_worker: Vec>, + sampled_probe_morsels: Vec, } impl BuildState { @@ -163,7 +228,7 @@ impl BuildState { let mut sketches = vec![CardinalitySketch::default(); partitioner.num_partitions()]; let (key_selectors, payload_selector); - if params.left_is_build { + if params.left_is_build.unwrap() { payload_selector = ¶ms.left_payload_select; key_selectors = ¶ms.left_key_selectors; } else { @@ -273,7 +338,7 @@ impl BuildState { } let df = if combined_frames.is_empty() { - if params.left_is_build { + if params.left_is_build.unwrap() { DataFrame::empty_with_schema(¶ms.left_payload_schema) } else { DataFrame::empty_with_schema(¶ms.right_payload_schema) @@ -292,6 +357,8 @@ impl BuildState { ProbeState { table_per_partition, max_seq_sent: MorselSeq::default(), + sampled_probe_morsels: core::mem::take(&mut self.sampled_probe_morsels), + sampled_probe_morsel_idx: AtomicUsize::new(0), } }) } @@ -308,6 +375,8 @@ struct ProbeTable { struct ProbeState { table_per_partition: Vec, max_seq_sent: MorselSeq, + sampled_probe_morsels: Vec, + sampled_probe_morsel_idx: AtomicUsize, } impl ProbeState { @@ -331,7 +400,7 @@ impl ProbeState { let emit_unmatched = params.emit_unmatched_probe(); let (key_selectors, payload_selector); - if params.left_is_build { + if params.left_is_build.unwrap() { payload_selector = ¶ms.right_payload_select; key_selectors = ¶ms.right_key_selectors; } else { @@ -391,7 +460,7 @@ impl ProbeState { } let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false); - let mut out_df = if params.left_is_build { + let mut out_df = if params.left_is_build.unwrap() { build_df.hstack_mut_unchecked(probe_df.get_columns()); build_df } else { @@ -460,7 +529,7 @@ impl ProbeState { let mut probe_df = payload.take_slice_unchecked_impl(&probe_match, false); - let out_df = if params.left_is_build { + let out_df = if params.left_is_build.unwrap() { build_df.hstack_mut_unchecked(probe_df.get_columns()); build_df } else { @@ -519,7 +588,7 @@ impl ProbeState { let mut build_df = p.df.take_chunked_unchecked(&unmarked_idxs, IsSorted::Not, false); let len = build_df.height(); - let mut out_df = if params.left_is_build { + let mut out_df = if params.left_is_build.unwrap() { let probe_df = DataFrame::full_null(¶ms.right_payload_schema, len); build_df.hstack_mut_unchecked(probe_df.get_columns()); build_df @@ -571,6 +640,7 @@ impl Drop for ProbeState { POOL.install(|| { // Parallel drop as the state might be quite big. self.table_per_partition.par_drain(..).for_each(drop); + self.sampled_probe_morsels.par_drain(..).for_each(drop); }) } } @@ -618,7 +688,7 @@ impl EmitUnmatchedState { let mut build_df = p.df.take_chunked_unchecked(&unmarked_idxs, IsSorted::Not, false); let len = build_df.height(); - if params.left_is_build { + if params.left_is_build.unwrap() { let probe_df = DataFrame::full_null(¶ms.right_payload_schema, len); build_df.hstack_mut_unchecked(probe_df.get_columns()); build_df @@ -653,6 +723,7 @@ impl EmitUnmatchedState { } enum EquiJoinState { + Sample(SampleState), Build(BuildState), Probe(ProbeState), EmitUnmatchedBuild(EmitUnmatchedState), @@ -661,11 +732,12 @@ enum EquiJoinState { } struct EquiJoinParams { - left_is_build: bool, + left_is_build: Option, preserve_order_build: bool, preserve_order_probe: bool, left_key_schema: Arc, left_key_selectors: Vec, + right_key_schema: Arc, right_key_selectors: Vec, left_payload_select: Vec>, right_payload_select: Vec>, @@ -678,7 +750,7 @@ struct EquiJoinParams { impl EquiJoinParams { /// Should we emit unmatched rows from the build side? fn emit_unmatched_build(&self) -> bool { - if self.left_is_build { + if self.left_is_build.unwrap() { self.args.how == JoinType::Left || self.args.how == JoinType::Full } else { self.args.how == JoinType::Right || self.args.how == JoinType::Full @@ -687,7 +759,7 @@ impl EquiJoinParams { /// Should we emit unmatched rows from the probe side? fn emit_unmatched_probe(&self) -> bool { - if self.left_is_build { + if self.left_is_build.unwrap() { self.args.how == JoinType::Right || self.args.how == JoinType::Full } else { self.args.how == JoinType::Left || self.args.how == JoinType::Full @@ -699,7 +771,7 @@ pub struct EquiJoinNode { state: EquiJoinState, params: EquiJoinParams, num_pipelines: usize, - table: Box, + table: Option>, } impl EquiJoinNode { @@ -713,11 +785,25 @@ impl EquiJoinNode { args: JoinArgs, ) -> PolarsResult { let left_is_build = match args.maintain_order { - // TODO: use cardinality estimation to determine build side when not order-preserving. - MaintainOrderJoin::None => args.how != JoinType::Left, - MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => false, - MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => true, + MaintainOrderJoin::None => { + if *SAMPLE_LIMIT == 0 { + Some(true) + } else { + None + } + }, + MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => Some(false), + MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => Some(true), }; + + let table = left_is_build.map(|lib| { + if lib { + new_chunked_idx_table(left_key_schema.clone()) + } else { + new_chunked_idx_table(right_key_schema.clone()) + } + }); + let preserve_order_probe = args.maintain_order != MaintainOrderJoin::None; let preserve_order_build = matches!( args.maintain_order, @@ -739,18 +825,16 @@ impl EquiJoinNode { &args, )?; - let table = if left_is_build { - new_chunked_idx_table(left_key_schema.clone()) + let state = if left_is_build.is_some() { + EquiJoinState::Build(BuildState::default()) } else { - new_chunked_idx_table(right_key_schema) + EquiJoinState::Sample(SampleState::default()) }; let left_payload_schema = select_schema(&left_input_schema, &left_payload_select); let right_payload_schema = select_schema(&right_input_schema, &right_payload_select); Ok(Self { - state: EquiJoinState::Build(BuildState { - partitions_per_worker: Vec::new(), - }), + state, num_pipelines: 0, params: EquiJoinParams { left_is_build, @@ -758,6 +842,7 @@ impl EquiJoinNode { preserve_order_probe, left_key_schema, left_key_selectors, + right_key_schema, right_key_selectors, left_payload_select, right_payload_select, @@ -771,6 +856,54 @@ impl EquiJoinNode { } } +// Not ideal - doesn't support stop requests before all cached items are flushed. +#[expect(clippy::needless_lifetimes)] +fn insert_cached_into_parallel_stream<'s, 'env>( + cached: &'s [Morsel], + cached_idx: &'s AtomicUsize, + num_pipelines: usize, + recv_port: Option>, + scope: &'s TaskScope<'s, 'env>, + join_handles: &mut Vec>>, +) -> Option>> { + if cached_idx.load(Ordering::Relaxed) >= cached.len() { + return recv_port.map(|p| p.parallel()); + } + + let receivers = if let Some(p) = recv_port { + p.parallel().into_iter().map(Some).collect_vec() + } else { + (0..num_pipelines).map(|_| None).collect_vec() + }; + + let mut out = Vec::new(); + for orig_recv in receivers { + let (mut new_send, new_recv) = connector(); + out.push(new_recv); + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + loop { + let idx = cached_idx.fetch_add(1, Ordering::Relaxed); + if idx >= cached.len() { + break; + } + if new_send.send(cached[idx].clone()).await.is_err() { + break; + } + } + + if let Some(mut recv) = orig_recv { + while let Ok(morsel) = recv.recv().await { + if new_send.send(morsel).await.is_err() { + break; + } + } + } + Ok(()) + })); + } + Some(out) +} + impl ComputeNode for EquiJoinNode { fn name(&self) -> &str { "equi_join" @@ -783,25 +916,190 @@ impl ComputeNode for EquiJoinNode { fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { assert!(recv.len() == 2 && send.len() == 1); - let build_idx = if self.params.left_is_build { 0 } else { 1 }; - let probe_idx = 1 - build_idx; - // If the output doesn't want any more data, transition to being done. if send[0] == PortState::Done { self.state = EquiJoinState::Done; } + // If we are sampling and both sides are done/filled, transition to building. + if let EquiJoinState::Sample(sample_state) = &mut self.state { + let left_saturated = sample_state.left_len >= *SAMPLE_LIMIT; + let right_saturated = sample_state.right_len >= *SAMPLE_LIMIT; + let left_done = recv[0] == PortState::Done || left_saturated; + let right_done = recv[1] == PortState::Done || right_saturated; + #[expect(clippy::nonminimal_bool)] + let stop_sampling = (left_done && right_done) + || (left_done + && sample_state.right_len >= LOPSIDED_SAMPLE_FACTOR * sample_state.left_len) + || (right_done + && sample_state.left_len >= LOPSIDED_SAMPLE_FACTOR * sample_state.right_len); + if stop_sampling { + if config::verbose() { + eprintln!( + "choosing equi-join build side, sample lengths are: {} vs. {}", + sample_state.left_len, sample_state.right_len + ); + } + + let estimate_cardinalities = || { + let execution_state = ExecutionState::new(); + let left_cardinality = estimate_cardinality( + &sample_state.left, + &self.params.left_key_selectors, + &self.params, + &execution_state, + )?; + let right_cardinality = estimate_cardinality( + &sample_state.right, + &self.params.right_key_selectors, + &self.params, + &execution_state, + )?; + let norm_left_factor = sample_state.left_len.min(*SAMPLE_LIMIT) as f64 + / sample_state.left_len as f64; + let norm_right_factor = sample_state.right_len.min(*SAMPLE_LIMIT) as f64 + / sample_state.right_len as f64; + let norm_left_cardinality = + (left_cardinality as f64 * norm_left_factor) as usize; + let norm_right_cardinality = + (right_cardinality as f64 * norm_right_factor) as usize; + if config::verbose() { + eprintln!("estimated cardinalities are: {norm_left_cardinality} vs. {norm_right_cardinality}"); + } + PolarsResult::Ok((norm_left_cardinality, norm_right_cardinality)) + }; + + let left_is_build = match (left_saturated, right_saturated) { + (false, false) => { + if sample_state.left_len * LOPSIDED_SAMPLE_FACTOR < sample_state.right_len + || sample_state.left_len + > sample_state.right_len * LOPSIDED_SAMPLE_FACTOR + { + // Don't bother estimating cardinality, just choose smaller as it's highly + // imbalanced. + sample_state.left_len < sample_state.right_len + } else { + let (lc, rc) = estimate_cardinalities()?; + // Let's assume for now that per element building a + // table is 3x more expensive than a probe, with + // unique keys getting an additional 3x factor for + // having to update the hash table in addition to the probe. + let left_build_cost = sample_state.left_len * 3 + 3 * lc; + let left_probe_cost = sample_state.left_len; + let right_build_cost = sample_state.right_len * 3 + 3 * rc; + let right_probe_cost = sample_state.right_len; + left_build_cost + right_probe_cost < left_probe_cost + right_build_cost + } + }, + + // Choose the unsaturated side, the saturated side could be + // arbitrarily big. + (false, true) => true, + (true, false) => false, + + // Estimate cardinality and choose smaller. + (true, true) => { + let (lc, rc) = estimate_cardinalities()?; + lc < rc + }, + }; + if config::verbose() { + eprintln!( + "build side chosen: {}", + if left_is_build { "left" } else { "right" } + ); + } + + // Transition to building state. + self.params.left_is_build = Some(left_is_build); + self.table = Some(if left_is_build { + new_chunked_idx_table(self.params.left_key_schema.clone()) + } else { + new_chunked_idx_table(self.params.right_key_schema.clone()) + }); + let mut sampled_build_morsels = core::mem::take(&mut sample_state.left); + let mut sampled_probe_morsels = core::mem::take(&mut sample_state.right); + if !left_is_build { + core::mem::swap(&mut sampled_build_morsels, &mut sampled_probe_morsels); + } + + let partitioner = HashPartitioner::new(self.num_pipelines, 0); + let mut build_state = BuildState { + partitions_per_worker: (0..self.num_pipelines).map(|_| Vec::new()).collect(), + sampled_probe_morsels, + }; + + // Simulate the sample build morsels flowing into the build side. + if !sampled_build_morsels.is_empty() { + let state = ExecutionState::new(); + let sampled_build_morsel_idx = AtomicUsize::new(0); + crate::async_executor::task_scope(|scope| { + let mut join_handles = Vec::new(); + let receivers = insert_cached_into_parallel_stream( + &sampled_build_morsels, + &sampled_build_morsel_idx, + self.num_pipelines, + None, + scope, + &mut join_handles, + ) + .unwrap(); + + for (worker_ps, recv) in + build_state.partitions_per_worker.iter_mut().zip(receivers) + { + join_handles.push(scope.spawn_task( + TaskPriority::High, + BuildState::partition_and_sink( + recv, + worker_ps, + partitioner.clone(), + &self.params, + &state, + ), + )); + } + + polars_io::pl_async::get_runtime().block_on(async move { + for handle in join_handles { + handle.await?; + } + PolarsResult::Ok(()) + }) + })?; + } + + POOL.install(|| { + // Parallel drop as the state might be quite big. + sampled_build_morsels.into_par_iter().for_each(drop); + }); + + self.state = EquiJoinState::Build(build_state); + } + } + + let build_idx = if self.params.left_is_build == Some(true) { + 0 + } else { + 1 + }; + let probe_idx = 1 - build_idx; + // If we are building and the build input is done, transition to probing. if let EquiJoinState::Build(build_state) = &mut self.state { if recv[build_idx] == PortState::Done { - self.state = EquiJoinState::Probe(build_state.finalize(&self.params, &*self.table)); + self.state = EquiJoinState::Probe( + build_state.finalize(&self.params, self.table.as_deref().unwrap()), + ); } } // If we are probing and the probe input is done, emit unmatched if // necessary, otherwise we're done. if let EquiJoinState::Probe(probe_state) = &mut self.state { - if recv[probe_idx] == PortState::Done { + let samples_consumed = probe_state.sampled_probe_morsel_idx.load(Ordering::Relaxed) + >= probe_state.sampled_probe_morsels.len(); + if samples_consumed && recv[probe_idx] == PortState::Done { if self.params.emit_unmatched_build() { if self.params.preserve_order_build { let partitioner = HashPartitioner::new(self.num_pipelines, 0); @@ -834,13 +1132,44 @@ impl ComputeNode for EquiJoinNode { } match &mut self.state { + EquiJoinState::Sample(sample_state) => { + send[0] = PortState::Blocked; + if recv[0] != PortState::Done { + recv[0] = if sample_state.left_len < *SAMPLE_LIMIT { + PortState::Ready + } else { + PortState::Blocked + }; + } + if recv[1] != PortState::Done { + recv[1] = if sample_state.right_len < *SAMPLE_LIMIT { + PortState::Ready + } else { + PortState::Blocked + }; + } + }, EquiJoinState::Build(_) => { send[0] = PortState::Blocked; - recv[build_idx] = PortState::Ready; - recv[probe_idx] = PortState::Blocked; + if recv[build_idx] != PortState::Done { + recv[build_idx] = PortState::Ready; + } + if recv[probe_idx] != PortState::Done { + recv[probe_idx] = PortState::Blocked; + } }, - EquiJoinState::Probe(_) => { - core::mem::swap(&mut send[0], &mut recv[probe_idx]); + EquiJoinState::Probe(probe_state) => { + if recv[probe_idx] != PortState::Done { + core::mem::swap(&mut send[0], &mut recv[probe_idx]); + } else { + send[0] = if probe_state.sampled_probe_morsel_idx.load(Ordering::Relaxed) + < probe_state.sampled_probe_morsels.len() + { + PortState::Ready + } else { + PortState::Done + }; + } recv[build_idx] = PortState::Done; }, EquiJoinState::EmitUnmatchedBuild(_) => { @@ -880,10 +1209,52 @@ impl ComputeNode for EquiJoinNode { assert!(recv_ports.len() == 2); assert!(send_ports.len() == 1); - let build_idx = if self.params.left_is_build { 0 } else { 1 }; + let build_idx = if self.params.left_is_build == Some(true) { + 0 + } else { + 1 + }; let probe_idx = 1 - build_idx; match &mut self.state { + EquiJoinState::Sample(sample_state) => { + assert!(send_ports[0].is_none()); + let left_final_len = Arc::new(AtomicUsize::new(if recv_ports[0].is_none() { + sample_state.left_len + } else { + usize::MAX + })); + let right_final_len = Arc::new(AtomicUsize::new(if recv_ports[1].is_none() { + sample_state.right_len + } else { + usize::MAX + })); + + if let Some(left_recv) = recv_ports[0].take() { + join_handles.push(scope.spawn_task( + TaskPriority::High, + SampleState::sink( + left_recv.serial(), + &mut sample_state.left, + &mut sample_state.left_len, + left_final_len.clone(), + right_final_len.clone(), + ), + )); + } + if let Some(right_recv) = recv_ports[1].take() { + join_handles.push(scope.spawn_task( + TaskPriority::High, + SampleState::sink( + right_recv.serial(), + &mut sample_state.right, + &mut sample_state.right_len, + right_final_len, + left_final_len, + ), + )); + } + }, EquiJoinState::Build(build_state) => { assert!(send_ports[0].is_none()); assert!(recv_ports[probe_idx].is_none()); @@ -909,8 +1280,16 @@ impl ComputeNode for EquiJoinNode { }, EquiJoinState::Probe(probe_state) => { assert!(recv_ports[build_idx].is_none()); - let receivers = recv_ports[probe_idx].take().unwrap().parallel(); let senders = send_ports[0].take().unwrap().parallel(); + let receivers = insert_cached_into_parallel_stream( + &probe_state.sampled_probe_morsels, + &probe_state.sampled_probe_morsel_idx, + self.num_pipelines, + recv_ports[probe_idx].take(), + scope, + join_handles, + ) + .unwrap(); let partitioner = HashPartitioner::new(self.num_pipelines, 0); let probe_tasks = receivers diff --git a/py-polars/tests/unit/datatypes/test_categorical.py b/py-polars/tests/unit/datatypes/test_categorical.py index 328489cd96ac..81d3dd3171e4 100644 --- a/py-polars/tests/unit/datatypes/test_categorical.py +++ b/py-polars/tests/unit/datatypes/test_categorical.py @@ -66,7 +66,7 @@ def test_categorical_full_outer_join() -> None: ] ) - df = dfa.join(dfb, on="key", how="full") + df = dfa.join(dfb, on="key", how="full", maintain_order="right_left") # the cast is important to test the rev map assert df["key"].cast(pl.String).to_list() == ["bar", None, "foo"] assert df["key_right"].cast(pl.String).to_list() == ["bar", "baz", None] diff --git a/py-polars/tests/unit/operations/test_join.py b/py-polars/tests/unit/operations/test_join.py index dd437b5e934a..bc2685441fb9 100644 --- a/py-polars/tests/unit/operations/test_join.py +++ b/py-polars/tests/unit/operations/test_join.py @@ -826,7 +826,7 @@ def test_full_outer_join_coalesce_different_names_13450() -> None: ) out = df1.join(df2, left_on="L1", right_on="L3", how="full", coalesce=True) - assert_frame_equal(out, expected) + assert_frame_equal(out, expected, check_row_order=False) # https://github.com/pola-rs/polars/issues/10663 diff --git a/py-polars/tests/unit/operations/unique/test_unique.py b/py-polars/tests/unit/operations/unique/test_unique.py index d41c72b1770a..9c6b1d096f88 100644 --- a/py-polars/tests/unit/operations/unique/test_unique.py +++ b/py-polars/tests/unit/operations/unique/test_unique.py @@ -108,13 +108,15 @@ def test_struct_unique_df() -> None: def test_sorted_unique_dates() -> None: - assert ( + out = ( pl.DataFrame( [pl.Series("dt", [date(2015, 6, 24), date(2015, 6, 23)], dtype=pl.Date)] ) .sort("dt") .unique(maintain_order=False) - ).to_dict(as_series=False) == {"dt": [date(2015, 6, 23), date(2015, 6, 24)]} + ) + expected = pl.DataFrame({"dt": [date(2015, 6, 23), date(2015, 6, 24)]}) + assert_frame_equal(out, expected, check_row_order=False) @pytest.mark.parametrize("maintain_order", [True, False]) diff --git a/py-polars/tests/unit/streaming/test_streaming_join.py b/py-polars/tests/unit/streaming/test_streaming_join.py index 9edea0ae8b3f..9a749c1da051 100644 --- a/py-polars/tests/unit/streaming/test_streaming_join.py +++ b/py-polars/tests/unit/streaming/test_streaming_join.py @@ -165,7 +165,9 @@ def test_join_null_matches(streaming: bool) -> None: {"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]} ) assert_frame_equal( - df_a.join(df_b, on="a", how="left").collect(streaming=streaming), expected + df_a.join(df_b, on="a", how="left").collect(streaming=streaming), + expected, + check_row_order=False, ) # Full outer expected = pl.DataFrame( @@ -202,6 +204,7 @@ def test_join_null_matches_multiple_keys(streaming: bool) -> None: assert_frame_equal( df_a.join(df_b, on=["a", "idx"], how="inner").collect(streaming=streaming), expected, + check_row_order=False, ) expected = pl.DataFrame( {"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]} @@ -209,6 +212,7 @@ def test_join_null_matches_multiple_keys(streaming: bool) -> None: assert_frame_equal( df_a.join(df_b, on=["a", "idx"], how="left").collect(streaming=streaming), expected, + check_row_order=False, ) expected = pl.DataFrame( diff --git a/py-polars/tests/unit/test_projections.py b/py-polars/tests/unit/test_projections.py index 6bf3afb55657..5b601ff5b3ec 100644 --- a/py-polars/tests/unit/test_projections.py +++ b/py-polars/tests/unit/test_projections.py @@ -389,6 +389,7 @@ def test_schema_full_outer_join_projection_pd_13287() -> None: how="full", left_on="a", right_on="c", + maintain_order="right_left", ).with_columns( pl.col("a").fill_null(pl.col("c")), ).select("a").collect().to_dict(as_series=False) == {"a": [2, 3, 1, 1]}