Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust)!: Rename all 'Chunk's to RecordBatch #16063

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::nested::*;
use super::util;
use crate::array::*;
use crate::datatypes::*;
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;
use crate::types::months_days_ns;
use crate::with_match_primitive_type;

Expand Down Expand Up @@ -461,7 +461,7 @@ fn skip_item<'a>(
Ok(block)
}

/// Deserializes a [`Block`] assumed to be encoded according to [`AvroField`] into [`RecordBatch`],
/// Deserializes a [`Block`] assumed to be encoded according to [`AvroField`] into [`RecordBatchT`],
/// using `projection` to ignore `avro_fields`.
/// # Panics
/// `fields`, `avro_fields` and `projection` must have the same length.
Expand All @@ -470,7 +470,7 @@ pub fn deserialize(
fields: &[Field],
avro_fields: &[AvroField],
projection: &[bool],
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
assert_eq!(fields.len(), avro_fields.len());
assert_eq!(fields.len(), projection.len());

Expand Down Expand Up @@ -508,7 +508,7 @@ pub fn deserialize(
}?
}
}
RecordBatch::try_new(
RecordBatchT::try_new(
arrays
.iter_mut()
.zip(projection.iter())
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub use schema::infer_schema;

use crate::array::Array;
use crate::datatypes::Field;
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// Single threaded, blocking reader of Avro; [`Iterator`] of [`RecordBatch`].
/// Single threaded, blocking reader of Avro; [`Iterator`] of [`RecordBatchT`].
pub struct Reader<R: Read> {
iter: BlockStreamingIterator<R>,
avro_fields: Vec<AvroField>,
Expand Down Expand Up @@ -53,7 +53,7 @@ impl<R: Read> Reader<R> {
}

impl<R: Read> Iterator for Reader<R> {
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;

fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-arrow/src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use crate::datatypes::*;
pub use crate::io::ipc::write::common::WriteOptions;
use crate::io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData};
use crate::io::ipc::{read, write};
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// Serializes [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries
/// Serializes [`RecordBatchT`] to a vector of [`FlightData`] representing the serialized dictionaries
/// and a [`FlightData`] representing the batch.
/// # Errors
/// This function errors iff `fields` is not consistent with `columns`
pub fn serialize_batch(
chunk: &RecordBatch<Box<dyn Array>>,
chunk: &RecordBatchT<Box<dyn Array>>,
fields: &[IpcField],
options: &WriteOptions,
) -> PolarsResult<(Vec<FlightData>, FlightData)> {
Expand Down Expand Up @@ -110,13 +110,13 @@ pub fn deserialize_schemas(bytes: &[u8]) -> PolarsResult<(ArrowSchema, IpcSchema
read::deserialize_schema(bytes)
}

/// Deserializes [`FlightData`] representing a record batch message to [`RecordBatch`].
/// Deserializes [`FlightData`] representing a record batch message to [`RecordBatchT`].
pub fn deserialize_batch(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &read::Dictionaries,
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
// check that the data_header is a record batch message
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|_err| polars_err!(oos = "Unable to get root as message: {err:?}"))?;
Expand Down Expand Up @@ -178,14 +178,14 @@ pub fn deserialize_dictionary(
Ok(())
}

/// Deserializes [`FlightData`] into either a [`RecordBatch`] (when the message is a record batch)
/// Deserializes [`FlightData`] into either a [`RecordBatchT`] (when the message is a record batch)
/// or by upserting into `dictionaries` (when the message is a dictionary)
pub fn deserialize_message(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> PolarsResult<Option<RecordBatch<Box<dyn Array>>>> {
) -> PolarsResult<Option<RecordBatchT<Box<dyn Array>>>> {
let FlightData {
data_header,
data_body,
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-arrow/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::array::*;
use crate::datatypes::{ArrowDataType, Field};
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::{IpcField, IpcSchema};
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

#[derive(Debug, Eq, PartialEq, Hash)]
enum ProjectionResult<A> {
Expand Down Expand Up @@ -70,7 +70,7 @@ impl<'a, A, I: Iterator<Item = A>> Iterator for ProjectionIter<'a, A, I> {
}
}

/// Returns a [`RecordBatch`] from a reader.
/// Returns a [`RecordBatchT`] from a reader.
/// # Panic
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
#[allow(clippy::too_many_arguments)]
Expand All @@ -86,7 +86,7 @@ pub fn read_record_batch<R: Read + Seek>(
block_offset: u64,
file_size: u64,
scratch: &mut Vec<u8>,
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
.buffers()
Expand Down Expand Up @@ -185,7 +185,7 @@ pub fn read_record_batch<R: Read + Seek>(
})
.collect::<PolarsResult<Vec<_>>>()?
};
RecordBatch::try_new(columns)
RecordBatchT::try_new(columns)
}

fn find_first_dict_field_d<'a>(
Expand Down Expand Up @@ -338,17 +338,17 @@ pub fn prepare_projection(
}

pub fn apply_projection(
chunk: RecordBatch<Box<dyn Array>>,
chunk: RecordBatchT<Box<dyn Array>>,
map: &AHashMap<usize, usize>,
) -> RecordBatch<Box<dyn Array>> {
) -> RecordBatchT<Box<dyn Array>> {
// re-order according to projection
let arrays = chunk.into_arrays();
let mut new_arrays = arrays.clone();

map.iter()
.for_each(|(old, new)| new_arrays[*new] = arrays[*old].clone());

RecordBatch::new(new_arrays)
RecordBatchT::new(new_arrays)
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{Dictionaries, OutOfSpecKind};
use crate::array::Array;
use crate::datatypes::ArrowSchemaRef;
use crate::io::ipc::IpcSchema;
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// Metadata of an Arrow IPC file, written in the footer of the file.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -299,7 +299,7 @@ pub fn read_batch<R: Read + Seek>(
index: usize,
message_scratch: &mut Vec<u8>,
data_scratch: &mut Vec<u8>,
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
) -> PolarsResult<RecordBatchT<Box<dyn Array>>> {
let block = metadata.blocks[index];

let offset: u64 = block
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-arrow/src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use super::{Dictionaries, FileMetadata, OutOfSpecKind};
use crate::array::*;
use crate::datatypes::{ArrowSchema, Field};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER};
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, PolarsResult<RecordBatch<Box<dyn Array>>>>,
stream: BoxStream<'a, PolarsResult<RecordBatchT<Box<dyn Array>>>>,
schema: Option<ArrowSchema>,
metadata: FileMetadata,
}
Expand Down Expand Up @@ -72,7 +72,7 @@ impl<'a> FileStream<'a> {
metadata: FileMetadata,
projection: Option<(Vec<usize>, AHashMap<usize, usize>)>,
limit: Option<usize>,
) -> BoxStream<'a, PolarsResult<RecordBatch<Box<dyn Array>>>>
) -> BoxStream<'a, PolarsResult<RecordBatchT<Box<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<'a> FileStream<'a> {
}

impl<'a> Stream for FileStream<'a> {
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn read_batch<R>(
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> PolarsResult<RecordBatch<Box<dyn Array>>>
) -> PolarsResult<RecordBatchT<Box<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use super::common::*;
use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata};
use crate::array::Array;
use crate::datatypes::ArrowSchema;
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// An iterator of [`RecordBatch`]s from an Arrow IPC file.
/// An iterator of [`RecordBatchT`]s from an Arrow IPC file.
pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<R: Read + Seek> FileReader<R> {
}

impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;

fn next(&mut self) -> Option<Self::Item> {
// get current block
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{Dictionaries, OutOfSpecKind};
use crate::array::Array;
use crate::datatypes::ArrowSchema;
use crate::io::ipc::IpcSchema;
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// Metadata of an Arrow IPC stream, written at the start of the stream
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -67,7 +67,7 @@ pub enum StreamState {
/// A live stream without data
Waiting,
/// Next item in the stream
Some(RecordBatch<Box<dyn Array>>),
Some(RecordBatchT<Box<dyn Array>>),
}

impl StreamState {
Expand All @@ -76,7 +76,7 @@ impl StreamState {
/// # Panics
///
/// If the `StreamState` was `Waiting`.
pub fn unwrap(self) -> RecordBatch<Box<dyn Array>> {
pub fn unwrap(self) -> RecordBatchT<Box<dyn Array>> {
if let StreamState::Some(batch) = self {
batch
} else {
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/io/ipc/read/stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::common::{read_dictionary, read_record_batch};
use super::schema::deserialize_stream_metadata;
use super::{Dictionaries, OutOfSpecKind, StreamMetadata};
use crate::array::*;
use crate::record_batch::RecordBatch;
use crate::record_batch::RecordBatchT;

/// A (private) state of stream messages
struct ReadState<R> {
Expand All @@ -28,7 +28,7 @@ enum StreamState<R> {
/// The stream does not contain new chunks (and it has not been closed)
Waiting(ReadState<R>),
/// The stream contain a new chunk
Some((ReadState<R>, RecordBatch<Box<dyn Array>>)),
Some((ReadState<R>, RecordBatchT<Box<dyn Array>>)),
}

/// Reads the [`StreamMetadata`] of the Arrow stream asynchronously
Expand Down Expand Up @@ -177,7 +177,7 @@ async fn maybe_next<R: AsyncRead + Unpin + Send>(
}
}

/// A [`Stream`] over an Arrow IPC stream that asynchronously yields [`RecordBatch`]s.
/// A [`Stream`] over an Arrow IPC stream that asynchronously yields [`RecordBatchT`]s.
pub struct AsyncStreamReader<'a, R: AsyncRead + Unpin + Send + 'a> {
metadata: StreamMetadata,
future: Option<BoxFuture<'a, PolarsResult<Option<StreamState<R>>>>>,
Expand All @@ -204,7 +204,7 @@ impl<'a, R: AsyncRead + Unpin + Send + 'a> AsyncStreamReader<'a, R> {
}

impl<'a, R: AsyncRead + Unpin + Send> Stream for AsyncStreamReader<'a, R> {
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
Expand Down
Loading
Loading