diff --git a/crates/polars-arrow/src/types/native.rs b/crates/polars-arrow/src/types/native.rs index 6f869df32602..230fdde387d1 100644 --- a/crates/polars-arrow/src/types/native.rs +++ b/crates/polars-arrow/src/types/native.rs @@ -1,10 +1,11 @@ +use std::hash::{Hash, Hasher}; use std::ops::Neg; use std::panic::RefUnwindSafe; use bytemuck::{Pod, Zeroable}; use polars_utils::min_max::MinMax; use polars_utils::nulls::IsNull; -use polars_utils::total_ord::{TotalEq, TotalOrd}; +use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash, TotalOrd, TotalOrdWrap}; use super::PrimitiveType; @@ -434,6 +435,44 @@ impl PartialEq for f16 { } } +/// Converts an f32 into a canonical form, where -0 == 0 and all NaNs map to +/// the same value. +#[inline] +pub fn canonical_f16(x: f16) -> f16 { + // zero out the sign bit if the f16 is zero. + let convert_zero = f16(x.0 & (0x7FFF | (u16::from(x.0 & 0x7FFF == 0) << 15))); + if convert_zero.is_nan() { + f16::from_bits(0x7c00) // Canonical quiet NaN. + } else { + convert_zero + } +} + +impl TotalHash for f16 { + #[inline(always)] + fn tot_hash(&self, state: &mut H) + where + H: Hasher, + { + canonical_f16(*self).to_bits().hash(state) + } +} + +impl ToTotalOrd for f16 { + type TotalOrdItem = TotalOrdWrap; + type SourceItem = f16; + + #[inline] + fn to_total_ord(&self) -> Self::TotalOrdItem { + TotalOrdWrap(*self) + } + + #[inline] + fn peel_total_ord(ord_item: Self::TotalOrdItem) -> Self::SourceItem { + ord_item.0 + } +} + impl IsNull for f16 { const HAS_NULLS: bool = false; type Inner = f16; diff --git a/crates/polars-compute/src/cardinality.rs b/crates/polars-compute/src/cardinality.rs new file mode 100644 index 000000000000..d28efa9d051e --- /dev/null +++ b/crates/polars-compute/src/cardinality.rs @@ -0,0 +1,159 @@ +use arrow::array::{ + Array, BinaryArray, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, PrimitiveArray, + Utf8Array, Utf8ViewArray, +}; +use arrow::datatypes::PhysicalType; +use arrow::types::Offset; +use arrow::with_match_primitive_type_full; +use polars_utils::total_ord::ToTotalOrd; + +use crate::hyperloglogplus::HyperLogLog; + +/// Get an estimate for the *cardinality* of the array (i.e. the number of unique values) +/// +/// This is not currently implemented for nested types. +pub fn estimate_cardinality(array: &dyn Array) -> usize { + if array.is_empty() { + return 0; + } + + if array.null_count() == array.len() { + return 1; + } + + // Estimate the cardinality with HyperLogLog + use PhysicalType as PT; + match array.dtype().to_physical_type() { + PT::Null => 1, + + PT::Boolean => { + let mut cardinality = 0; + + let array = array.as_any().downcast_ref::().unwrap(); + + cardinality += usize::from(array.has_nulls()); + + if let Some(unset_bits) = array.values().lazy_unset_bits() { + cardinality += 1 + usize::from(unset_bits != array.len()); + } else { + cardinality += 2; + } + + cardinality + }, + + PT::Primitive(primitive_type) => with_match_primitive_type_full!(primitive_type, |$T| { + let mut hll = HyperLogLog::new(); + + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.copied().unwrap_or_default(); + hll.add(&v.to_total_ord()); + } + } else { + for v in array.values_iter() { + hll.add(&v.to_total_ord()); + } + } + + hll.count() + }), + PT::FixedSizeBinary => { + let mut hll = HyperLogLog::new(); + + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.unwrap_or_default(); + hll.add(v); + } + } else { + for v in array.values_iter() { + hll.add(v); + } + } + + hll.count() + }, + PT::Binary => { + binary_offset_array_estimate(array.as_any().downcast_ref::>().unwrap()) + }, + PT::LargeBinary => { + binary_offset_array_estimate(array.as_any().downcast_ref::>().unwrap()) + }, + PT::Utf8 => binary_offset_array_estimate( + &array + .as_any() + .downcast_ref::>() + .unwrap() + .to_binary(), + ), + PT::LargeUtf8 => binary_offset_array_estimate( + &array + .as_any() + .downcast_ref::>() + .unwrap() + .to_binary(), + ), + PT::BinaryView => { + binary_view_array_estimate(array.as_any().downcast_ref::().unwrap()) + }, + PT::Utf8View => binary_view_array_estimate( + &array + .as_any() + .downcast_ref::() + .unwrap() + .to_binview(), + ), + PT::List => unimplemented!(), + PT::FixedSizeList => unimplemented!(), + PT::LargeList => unimplemented!(), + PT::Struct => unimplemented!(), + PT::Union => unimplemented!(), + PT::Map => unimplemented!(), + PT::Dictionary(_) => unimplemented!(), + } +} + +fn binary_offset_array_estimate(array: &BinaryArray) -> usize { + let mut hll = HyperLogLog::new(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.unwrap_or_default(); + hll.add(v); + } + } else { + for v in array.values_iter() { + hll.add(v); + } + } + + hll.count() +} + +fn binary_view_array_estimate(array: &BinaryViewArray) -> usize { + let mut hll = HyperLogLog::new(); + + if array.has_nulls() { + for v in array.iter() { + let v = v.unwrap_or_default(); + hll.add(v); + } + } else { + for v in array.values_iter() { + hll.add(v); + } + } + + hll.count() +} diff --git a/crates/polars-compute/src/lib.rs b/crates/polars-compute/src/lib.rs index da56c65983db..30efdd59adc7 100644 --- a/crates/polars-compute/src/lib.rs +++ b/crates/polars-compute/src/lib.rs @@ -10,6 +10,8 @@ use arrow::types::NativeType; pub mod arithmetic; pub mod arity; pub mod bitwise; +#[cfg(feature = "approx_unique")] +pub mod cardinality; pub mod comparisons; pub mod filter; pub mod float_sum; diff --git a/crates/polars-parquet/Cargo.toml b/crates/polars-parquet/Cargo.toml index 26a57b22e713..881c9a477398 100644 --- a/crates/polars-parquet/Cargo.toml +++ b/crates/polars-parquet/Cargo.toml @@ -22,7 +22,7 @@ fallible-streaming-iterator = { workspace = true, optional = true } futures = { workspace = true, optional = true } hashbrown = { workspace = true } num-traits = { workspace = true } -polars-compute = { workspace = true } +polars-compute = { workspace = true, features = ["approx_unique"] } polars-error = { workspace = true } polars-utils = { workspace = true, features = ["mmap"] } simdutf8 = { workspace = true } diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 4e0d57302314..17527fc488f7 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -3,11 +3,12 @@ use arrow::array::{ }; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::buffer::Buffer; -use arrow::datatypes::{ArrowDataType, IntegerType}; +use arrow::datatypes::{ArrowDataType, IntegerType, PhysicalType}; +use arrow::legacy::utils::CustomIterTools; +use arrow::trusted_len::TrustMyLength; use arrow::types::NativeType; use polars_compute::min_max::MinMaxKernel; use polars_error::{polars_bail, PolarsResult}; -use polars_utils::unwrap::UnwrapUncheckedRelease; use super::binary::{ build_statistics as binary_build_statistics, encode_plain as binary_encode_plain, @@ -31,33 +32,51 @@ use crate::parquet::CowBuffer; use crate::write::DynIter; trait MinMaxThreshold { - const DELTA_THRESHOLD: Self; + const DELTA_THRESHOLD: usize; + const BITMASK_THRESHOLD: usize; + + fn from_start_and_offset(start: Self, offset: usize) -> Self; } macro_rules! minmaxthreshold_impls { - ($($t:ty => $threshold:literal,)+) => { + ($($signed:ty, $unsigned:ty => $threshold:literal, $bm_threshold:expr,)+) => { $( - impl MinMaxThreshold for $t { - const DELTA_THRESHOLD: Self = $threshold; + impl MinMaxThreshold for $signed { + const DELTA_THRESHOLD: usize = $threshold; + const BITMASK_THRESHOLD: usize = $bm_threshold; + + fn from_start_and_offset(start: Self, offset: usize) -> Self { + start + ((offset as $unsigned) as $signed) + } + } + impl MinMaxThreshold for $unsigned { + const DELTA_THRESHOLD: usize = $threshold; + const BITMASK_THRESHOLD: usize = $bm_threshold; + + fn from_start_and_offset(start: Self, offset: usize) -> Self { + start + (offset as $unsigned) + } } )+ }; } minmaxthreshold_impls! { - i8 => 16, - i16 => 256, - i32 => 512, - i64 => 2048, - u8 => 16, - u16 => 256, - u32 => 512, - u64 => 2048, + i8, u8 => 16, u8::MAX as usize, + i16, u16 => 256, u16::MAX as usize, + i32, u32 => 512, u16::MAX as usize, + i64, u64 => 2048, u16::MAX as usize, +} + +enum DictionaryDecision { + NotWorth, + TryAgain, + Found(DictionaryArray), } fn min_max_integer_encode_as_dictionary_optional<'a, E, T>( array: &'a dyn Array, -) -> Option> +) -> DictionaryDecision where E: std::fmt::Debug, T: NativeType @@ -65,26 +84,82 @@ where + std::cmp::Ord + TryInto + std::ops::Sub - + num_traits::CheckedSub, + + num_traits::CheckedSub + + num_traits::cast::AsPrimitive, std::ops::RangeInclusive: Iterator, PrimitiveArray: MinMaxKernel = T>, { - use ArrowDataType as DT; - let (min, max): (T, T) = as MinMaxKernel>::min_max_ignore_nan_kernel( + let min_max = as MinMaxKernel>::min_max_ignore_nan_kernel( array.as_any().downcast_ref().unwrap(), - )?; + ); + + let Some((min, max)) = min_max else { + return DictionaryDecision::TryAgain; + }; debug_assert!(max >= min, "{max} >= {min}"); - if !max - .checked_sub(&min) - .is_some_and(|v| v <= T::DELTA_THRESHOLD) - { - return None; + let Some(diff) = max.checked_sub(&min) else { + return DictionaryDecision::TryAgain; + }; + + let diff = diff.as_(); + + if diff > T::BITMASK_THRESHOLD { + return DictionaryDecision::TryAgain; + } + + let mut seen_mask = MutableBitmap::from_len_zeroed(diff + 1); + + let array = array.as_any().downcast_ref::>().unwrap(); + + if array.has_nulls() { + for v in array.non_null_values_iter() { + let offset = (v - min).as_(); + debug_assert!(offset <= diff); + + unsafe { + seen_mask.set_unchecked(offset, true); + } + } + } else { + for v in array.values_iter() { + let offset = (*v - min).as_(); + debug_assert!(offset <= diff); + + unsafe { + seen_mask.set_unchecked(offset, true); + } + } } - // @TODO: This currently overestimates the values, it might be interesting to use the unique - // kernel here. - let values = PrimitiveArray::new(DT::from(T::PRIMITIVE), (min..=max).collect(), None); + let cardinality = seen_mask.set_bits(); + + let mut is_worth_it = false; + + is_worth_it |= cardinality <= T::DELTA_THRESHOLD; + is_worth_it |= (cardinality as f64) / (array.len() as f64) < 0.75; + + if !is_worth_it { + return DictionaryDecision::NotWorth; + } + + let seen_mask = seen_mask.freeze(); + + // SAFETY: We just did the calculation for this. + let indexes = seen_mask + .true_idx_iter() + .map(|idx| T::from_start_and_offset(min, idx)); + let indexes = unsafe { TrustMyLength::new(indexes, cardinality) }; + let indexes = indexes.collect_trusted::>(); + + let mut lookup = vec![0u16; diff + 1]; + + for (i, &idx) in indexes.iter().enumerate() { + lookup[(idx - min).as_()] = i as u16; + } + + use ArrowDataType as DT; + let values = PrimitiveArray::new(DT::from(T::PRIMITIVE), indexes.into(), None); let values = Box::new(values); let keys: Buffer = array @@ -93,20 +168,19 @@ where .unwrap() .values() .iter() - .map(|v| unsafe { + .map(|v| { // @NOTE: // Since the values might contain nulls which have a undefined value. We just // clamp the values to between the min and max value. This way, they will still - // be valid dictionary keys. This is mostly to make the - // unwrap_unchecked_release not produce any unsafety. - (*v.clamp(&min, &max) - min) - .try_into() - .unwrap_unchecked_release() + // be valid dictionary keys. + let idx = *v.clamp(&min, &max) - min; + let value = unsafe { lookup.get_unchecked(idx.as_()) }; + (*value).into() }) .collect(); let keys = PrimitiveArray::new(DT::UInt32, keys, array.validity().cloned()); - Some( + DictionaryDecision::Found( DictionaryArray::::try_new( ArrowDataType::Dictionary( IntegerType::UInt32, @@ -126,26 +200,15 @@ pub(crate) fn encode_as_dictionary_optional( type_: PrimitiveType, options: WriteOptions, ) -> Option>>> { - use ArrowDataType as DT; - let fast_dictionary = match array.dtype() { - DT::Int8 => min_max_integer_encode_as_dictionary_optional::<_, i8>(array), - DT::Int16 => min_max_integer_encode_as_dictionary_optional::<_, i16>(array), - DT::Int32 | DT::Date32 | DT::Time32(_) => { - min_max_integer_encode_as_dictionary_optional::<_, i32>(array) - }, - DT::Int64 | DT::Date64 | DT::Time64(_) | DT::Timestamp(_, _) | DT::Duration(_) => { - min_max_integer_encode_as_dictionary_optional::<_, i64>(array) - }, - DT::UInt8 => min_max_integer_encode_as_dictionary_optional::<_, u8>(array), - DT::UInt16 => min_max_integer_encode_as_dictionary_optional::<_, u16>(array), - DT::UInt32 => min_max_integer_encode_as_dictionary_optional::<_, u32>(array), - DT::UInt64 => min_max_integer_encode_as_dictionary_optional::<_, u64>(array), - _ => None, - }; + if array.is_empty() { + let array = DictionaryArray::::new_empty(ArrowDataType::Dictionary( + IntegerType::UInt32, + Box::new(array.dtype().clone()), + false, // @TODO: This might be able to be set to true? + )); - if let Some(fast_dictionary) = fast_dictionary { return Some(array_to_pages( - &fast_dictionary, + &array, type_, nested, options, @@ -153,9 +216,44 @@ pub(crate) fn encode_as_dictionary_optional( )); } + use arrow::types::PrimitiveType as PT; + let fast_dictionary = match array.dtype().to_physical_type() { + PhysicalType::Primitive(pt) => match pt { + PT::Int8 => min_max_integer_encode_as_dictionary_optional::<_, i8>(array), + PT::Int16 => min_max_integer_encode_as_dictionary_optional::<_, i16>(array), + PT::Int32 => min_max_integer_encode_as_dictionary_optional::<_, i32>(array), + PT::Int64 => min_max_integer_encode_as_dictionary_optional::<_, i64>(array), + PT::UInt8 => min_max_integer_encode_as_dictionary_optional::<_, u8>(array), + PT::UInt16 => min_max_integer_encode_as_dictionary_optional::<_, u16>(array), + PT::UInt32 => min_max_integer_encode_as_dictionary_optional::<_, u32>(array), + PT::UInt64 => min_max_integer_encode_as_dictionary_optional::<_, u64>(array), + _ => DictionaryDecision::TryAgain, + }, + _ => DictionaryDecision::TryAgain, + }; + + match fast_dictionary { + DictionaryDecision::NotWorth => return None, + DictionaryDecision::Found(dictionary_array) => { + return Some(array_to_pages( + &dictionary_array, + type_, + nested, + options, + Encoding::RleDictionary, + )) + }, + DictionaryDecision::TryAgain => {}, + } + let dtype = Box::new(array.dtype().clone()); - let len_before = array.len(); + let estimated_cardinality = polars_compute::cardinality::estimate_cardinality(array); + + if array.len() > 128 && (estimated_cardinality as f64) / (array.len() as f64) > 0.75 { + return None; + } + // This does the group by. let array = arrow::compute::cast::cast( array, @@ -169,10 +267,6 @@ pub(crate) fn encode_as_dictionary_optional( .downcast_ref::>() .unwrap(); - if (array.values().len() as f64) / (len_before as f64) > 0.75 { - return None; - } - Some(array_to_pages( array, type_, diff --git a/crates/polars-parquet/src/parquet/read/page/reader.rs b/crates/polars-parquet/src/parquet/read/page/reader.rs index cd23af0499d7..ad453a0ff50a 100644 --- a/crates/polars-parquet/src/parquet/read/page/reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/reader.rs @@ -13,6 +13,7 @@ use crate::parquet::page::{ ParquetPageHeader, }; use crate::parquet::CowBuffer; +use crate::write::Encoding; /// This meta is a small part of [`ColumnChunkMetadata`]. #[derive(Debug, Clone, PartialEq, Eq)] @@ -251,7 +252,10 @@ pub(super) fn finish_page( })?; if do_verbose { - println!("DictPage ( )"); + eprintln!( + "Parquet DictPage ( num_values: {}, datatype: {:?} )", + dict_header.num_values, descriptor.primitive_type + ); } let is_sorted = dict_header.is_sorted.unwrap_or(false); @@ -275,9 +279,11 @@ pub(super) fn finish_page( })?; if do_verbose { - println!( - "DataPageV1 ( num_values: {}, datatype: {:?}, encoding: {:?} )", - header.num_values, descriptor.primitive_type, header.encoding + eprintln!( + "Parquet DataPageV1 ( num_values: {}, datatype: {:?}, encoding: {:?} )", + header.num_values, + descriptor.primitive_type, + Encoding::try_from(header.encoding).ok() ); } @@ -298,8 +304,10 @@ pub(super) fn finish_page( if do_verbose { println!( - "DataPageV2 ( num_values: {}, datatype: {:?}, encoding: {:?} )", - header.num_values, descriptor.primitive_type, header.encoding + "Parquet DataPageV2 ( num_values: {}, datatype: {:?}, encoding: {:?} )", + header.num_values, + descriptor.primitive_type, + Encoding::try_from(header.encoding).ok() ); }