Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Index Column of Type Duration in DataFrame.rolling #15999

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
52d0792
feat: Allow rolling windows over duration columns.
Nennia May 1, 2024
fb7430e
feat: Allow rolling windows over duration columns.
Nennia May 1, 2024
9b53a23
Merge branch 'pola-rs:main' into master
AlexanderNenninger May 1, 2024
6884a8c
fix(python): Add types to test and use Python 3.8 compatible string l…
Nennia May 1, 2024
62aebcb
fix(python): Python3.8 compatible f-strings.
Nennia May 1, 2024
aabdf56
fix(python): formatting
Nennia May 1, 2024
b7d9045
fix(python): retrigger CI
Nennia May 1, 2024
07bc9c8
fix(python): formatting
Nennia May 1, 2024
c5d92ec
feat: Additional `uint` datatype support for the SQL interface (#15993)
alexander-beedie May 2, 2024
1498a37
fix: Crash/incorrect group_by/n_unique on categoricals created by (q)…
nameexhaustion May 2, 2024
cbbb8c8
test(python): Improve hypothesis strategy for decimals (#16001)
stinodego May 2, 2024
63a2af6
docs(python): Improve user-guide doc of UDF (#15923)
May 2, 2024
aa46fef
refactor: Add some comments (#16008)
ritchie46 May 2, 2024
f0d81b7
feat: Improve dynamic supertypes (#16009)
ritchie46 May 2, 2024
d6306be
docs(python): correct default in rolling_* function examples (#16000)
MarcoGorelli May 2, 2024
19e7548
fix: Fix CSE case where upper plan has no projection (#16011)
ritchie46 May 2, 2024
24c8fe1
fix: properly handle nulls in DictionaryArray::iter_typed (#16013)
orlp May 2, 2024
28373f0
docs(python): Remove unwanted linebreaks from docstrings (#16002)
bertiewooster May 2, 2024
31a019f
feat: Convert concat during IR conversion (#16016)
ritchie46 May 2, 2024
4f8e1dc
feat: raise more informative error messages in rolling_* aggregations…
MarcoGorelli May 2, 2024
6ca8f79
refactor: Use UnionArgs for DSL side (#16017)
ritchie46 May 3, 2024
267e5b6
docs(python): Update reference to `apply` (#15982)
avimallu May 3, 2024
5cc6b27
fix(python): formatting
Nennia May 3, 2024
f66dba1
feat: Allow rolling windows over duration columns.
Nennia May 1, 2024
3008b1d
fix(python): Add types to test and use Python 3.8 compatible string l…
Nennia May 1, 2024
4276b70
fix(python): retrigger CI
Nennia May 1, 2024
396fd23
feat: raise more informative error messages in rolling_* aggregations…
MarcoGorelli May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions crates/polars-arrow/src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use polars_error::{polars_bail, PolarsResult};
use super::primitive::PrimitiveArray;
use super::specification::check_indexes;
use super::{new_empty_array, new_null_array, Array};
use crate::array::dictionary::typed_iterator::{DictValue, DictionaryValuesIterTyped};
use crate::array::dictionary::typed_iterator::{
DictValue, DictionaryIterTyped, DictionaryValuesIterTyped,
};

/// Trait denoting [`NativeType`]s that can be used as keys of a dictionary.
/// # Safety
Expand Down Expand Up @@ -241,30 +243,22 @@ impl<K: DictionaryKey> DictionaryArray<K> {
///
/// # Panics
///
/// Panics if the keys of this [`DictionaryArray`] have any null types.
/// If they do [`DictionaryArray::iter_typed`] should be called
/// Panics if the keys of this [`DictionaryArray`] has any nulls.
/// If they do [`DictionaryArray::iter_typed`] should be used.
pub fn values_iter_typed<V: DictValue>(&self) -> PolarsResult<DictionaryValuesIterTyped<K, V>> {
let keys = &self.keys;
assert_eq!(keys.null_count(), 0);
let values = self.values.as_ref();
let values = V::downcast_values(values)?;
Ok(unsafe { DictionaryValuesIterTyped::new(keys, values) })
Ok(DictionaryValuesIterTyped::new(keys, values))
}

/// Returns an iterator over the optional values of [`Option<V::IterValue>`].
///
/// # Panics
///
/// This function panics if the `values` array
pub fn iter_typed<V: DictValue>(
&self,
) -> PolarsResult<ZipValidity<V::IterValue<'_>, DictionaryValuesIterTyped<K, V>, BitmapIter>>
{
pub fn iter_typed<V: DictValue>(&self) -> PolarsResult<DictionaryIterTyped<K, V>> {
let keys = &self.keys;
let values = self.values.as_ref();
let values = V::downcast_values(values)?;
let values_iter = unsafe { DictionaryValuesIterTyped::new(keys, values) };
Ok(ZipValidity::new_with_validity(values_iter, self.validity()))
Ok(DictionaryIterTyped::new(keys, values))
}

/// Returns the [`ArrowDataType`] of this [`DictionaryArray`]
Expand Down
70 changes: 68 additions & 2 deletions crates/polars-arrow/src/array/dictionary/typed_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars_error::{polars_err, PolarsResult};

use super::DictionaryKey;
use crate::array::{Array, PrimitiveArray, Utf8Array, Utf8ViewArray};
use crate::array::{Array, PrimitiveArray, StaticArray, Utf8Array, Utf8ViewArray};
use crate::trusted_len::TrustedLen;
use crate::types::Offset;

Expand Down Expand Up @@ -85,7 +85,8 @@ pub struct DictionaryValuesIterTyped<'a, K: DictionaryKey, V: DictValue> {
}

impl<'a, K: DictionaryKey, V: DictValue> DictionaryValuesIterTyped<'a, K, V> {
pub(super) unsafe fn new(keys: &'a PrimitiveArray<K>, values: &'a V) -> Self {
pub(super) fn new(keys: &'a PrimitiveArray<K>, values: &'a V) -> Self {
assert_eq!(keys.null_count(), 0);
Self {
keys,
values,
Expand Down Expand Up @@ -137,3 +138,68 @@ impl<'a, K: DictionaryKey, V: DictValue> DoubleEndedIterator
}
}
}

pub struct DictionaryIterTyped<'a, K: DictionaryKey, V: DictValue> {
keys: &'a PrimitiveArray<K>,
values: &'a V,
index: usize,
end: usize,
}

impl<'a, K: DictionaryKey, V: DictValue> DictionaryIterTyped<'a, K, V> {
pub(super) fn new(keys: &'a PrimitiveArray<K>, values: &'a V) -> Self {
Self {
keys,
values,
index: 0,
end: keys.len(),
}
}
}

impl<'a, K: DictionaryKey, V: DictValue> Iterator for DictionaryIterTyped<'a, K, V> {
type Item = Option<V::IterValue<'a>>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.index == self.end {
return None;
}
let old = self.index;
self.index += 1;
unsafe {
if let Some(key) = self.keys.get_unchecked(old) {
let idx = key.as_usize();
Some(Some(self.values.get_unchecked(idx)))
} else {
Some(None)
}
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.end - self.index, Some(self.end - self.index))
}
}

unsafe impl<'a, K: DictionaryKey, V: DictValue> TrustedLen for DictionaryIterTyped<'a, K, V> {}

impl<'a, K: DictionaryKey, V: DictValue> DoubleEndedIterator for DictionaryIterTyped<'a, K, V> {
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
if self.index == self.end {
None
} else {
self.end -= 1;
unsafe {
if let Some(key) = self.keys.get_unchecked(self.end) {
let idx = key.as_usize();
Some(Some(self.values.get_unchecked(idx)))
} else {
Some(None)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ impl CategoricalChunked {
self
}

pub fn _with_fast_unique(self, toggle: bool) -> Self {
self.with_fast_unique(toggle)
}

/// Get a reference to the mapping of categorical types to the string values.
pub fn get_rev_map(&self) -> &Arc<RevMapping> {
if let DataType::Categorical(Some(rev_map), _) | DataType::Enum(Some(rev_map), _) =
Expand Down
41 changes: 28 additions & 13 deletions crates/polars-core/src/utils/supertype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,28 +264,43 @@ pub fn get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
},
(dt, Unknown(kind)) => {
match kind {
// numeric vs float|str -> always float|str
UnknownKind::Float | UnknownKind::Int(_) if dt.is_float() | dt.is_string() => Some(dt.clone()),
UnknownKind::Float if dt.is_numeric() => Some(Unknown(UnknownKind::Float)),
UnknownKind::Float if dt.is_integer() => Some(Unknown(UnknownKind::Float)),
// Materialize float
UnknownKind::Float if dt.is_float() => Some(dt.clone()),
// Materialize str
UnknownKind::Str if dt.is_string() | dt.is_enum() => Some(dt.clone()),
// Materialize str
#[cfg(feature = "dtype-categorical")]
UnknownKind::Str if dt.is_categorical() => {
let Categorical(_, ord) = dt else { unreachable!()};
Some(Categorical(None, *ord))
},
// Keep unknown
dynam if dt.is_null() => Some(Unknown(*dynam)),
// Find integers sizes
UnknownKind::Int(v) if dt.is_numeric() => {
let smallest_fitting_dtype = if dt.is_unsigned_integer() && v.is_positive() {
materialize_dyn_int_pos(*v).dtype()
} else {
materialize_smallest_dyn_int(*v).dtype()
};
match dt {
UInt64 if smallest_fitting_dtype.is_signed_integer() => {
// Ensure we don't cast to float when dealing with dynamic literals
Some(Int64)
},
_ => {
get_supertype(dt, &smallest_fitting_dtype)
// Both dyn int
if let Unknown(UnknownKind::Int(v_other)) = dt {
// Take the maximum value to ensure we bubble up the required minimal size.
Some(Unknown(UnknownKind::Int(std::cmp::max(*v, *v_other))))
}
// dyn int vs number
else {
let smallest_fitting_dtype = if dt.is_unsigned_integer() && v.is_positive() {
materialize_dyn_int_pos(*v).dtype()
} else {
materialize_smallest_dyn_int(*v).dtype()
};
match dt {
UInt64 if smallest_fitting_dtype.is_signed_integer() => {
// Ensure we don't cast to float when dealing with dynamic literals
Some(Int64)
},
_ => {
get_supertype(dt, &smallest_fitting_dtype)
}
}
}
}
Expand Down
24 changes: 17 additions & 7 deletions crates/polars-io/src/csv/read/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_utils::sync::SyncPtr;
use polars_utils::IdxSize;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

Expand Down Expand Up @@ -54,6 +55,8 @@ pub(crate) fn get_offsets(
}
}

/// Reads bytes from `file` to `buf` and returns pointers into `buf` that can be parsed.
/// TODO! this can be implemented without copying by pointing in the memmapped file.
struct ChunkReader<'a> {
file: &'a File,
buf: Vec<u8>,
Expand Down Expand Up @@ -109,18 +112,23 @@ impl<'a> ChunkReader<'a> {
self.buf_end = 0;
}

fn return_slice(&self, start: usize, end: usize) -> (usize, usize) {
fn return_slice(&self, start: usize, end: usize) -> (SyncPtr<u8>, usize) {
let slice = &self.buf[start..end];
let len = slice.len();
(slice.as_ptr() as usize, len)
(slice.as_ptr().into(), len)
}

fn get_buf(&self) -> (usize, usize) {
fn get_buf_remaining(&self) -> (SyncPtr<u8>, usize) {
let slice = &self.buf[self.buf_end..];
let len = slice.len();
(slice.as_ptr() as usize, len)
(slice.as_ptr().into(), len)
}

// Get next `n` offset positions. Where `n` is number of chunks.

// This returns pointers into slices into `buf`
// we must process the slices before the next call
// as that will overwrite the slices
fn read(&mut self, n: usize) -> bool {
self.reslice();

Expand Down Expand Up @@ -267,7 +275,7 @@ pub struct BatchedCsvReaderRead<'a> {
chunk_size: usize,
finished: bool,
file_chunk_reader: ChunkReader<'a>,
file_chunks: Vec<(usize, usize)>,
file_chunks: Vec<(SyncPtr<u8>, usize)>,
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_index: Option<RowIndex>,
Expand All @@ -292,6 +300,7 @@ pub struct BatchedCsvReaderRead<'a> {
}
//
impl<'a> BatchedCsvReaderRead<'a> {
/// `n` number of batches.
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if n == 0 || self.finished {
return Ok(None);
Expand Down Expand Up @@ -320,7 +329,8 @@ impl<'a> BatchedCsvReaderRead<'a> {
// ensure we process the final slice as well.
if self.file_chunk_reader.finished && self.file_chunks.len() < n {
// get the final slice
self.file_chunks.push(self.file_chunk_reader.get_buf());
self.file_chunks
.push(self.file_chunk_reader.get_buf_remaining());
self.finished = true
}

Expand All @@ -333,7 +343,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
self.file_chunks
.par_iter()
.map(|(ptr, len)| {
let chunk = unsafe { std::slice::from_raw_parts(*ptr as *const u8, *len) };
let chunk = unsafe { std::slice::from_raw_parts(ptr.get(), *len) };
let stop_at_n_bytes = chunk.len();
let mut df = read_chunk(
chunk,
Expand Down
Loading