Skip to content

Commit

Permalink
couple more
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Dec 6, 2024
1 parent 10110b5 commit 27e3959
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 23 deletions.
17 changes: 7 additions & 10 deletions pyo3-arrow/src/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyTuple, PyType};

use crate::error::PyArrowResult;
use crate::export::Arro3Field;
use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3Field};
use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
use crate::ffi::from_python::utils::import_stream_pycapsule;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
Expand Down Expand Up @@ -140,8 +140,8 @@ impl PyArrayReader {
self.to_arro3(py)
}

fn __next__(&mut self, py: Python) -> PyArrowResult<PyObject> {
self.read_next_array(py)
fn __next__(&mut self) -> PyArrowResult<Arro3Array> {
self.read_next_array()
}

fn __repr__(&self) -> String {
Expand Down Expand Up @@ -190,7 +190,7 @@ impl PyArrayReader {
Ok(PyField::new(self.field_ref()?).into())
}

fn read_all(&mut self, py: Python) -> PyArrowResult<PyObject> {
fn read_all(&mut self) -> PyArrowResult<Arro3ChunkedArray> {
let stream = self
.0
.lock()
Expand All @@ -202,20 +202,17 @@ impl PyArrayReader {
for array in stream {
arrays.push(array?);
}
Ok(PyChunkedArray::try_new(arrays, field)?.to_arro3(py)?)
Ok(PyChunkedArray::try_new(arrays, field)?.into())
}

fn read_next_array(&mut self, py: Python) -> PyArrowResult<PyObject> {
fn read_next_array(&mut self) -> PyArrowResult<Arro3Array> {
let mut inner = self.0.lock().unwrap();
let stream = inner
.as_mut()
.ok_or(PyIOError::new_err("Cannot read from closed stream."))?;

if let Some(next_batch) = stream.next() {
Ok(PyArray::new(next_batch?, stream.field())
.to_arro3(py)?
.into_any()
.unbind())
Ok(PyArray::new(next_batch?, stream.field()).into())
} else {
Err(PyStopIteration::new_err("").into())
}
Expand Down
20 changes: 7 additions & 13 deletions pyo3-arrow/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType};
use pyo3::{intern, IntoPyObjectExt};

use crate::error::{PyArrowError, PyArrowResult};
use crate::export::{Arro3Array, Arro3DataType, Arro3Field};
use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
use crate::ffi::from_python::utils::import_stream_pycapsule;
use crate::ffi::to_python::chunked::ArrayIterator;
Expand Down Expand Up @@ -357,14 +357,14 @@ impl PyChunkedArray {
Self::from_arrow_pycapsule(capsule)
}

fn cast(&self, py: Python, target_type: PyField) -> PyArrowResult<PyObject> {
fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
let new_field = target_type.into_inner();
let new_chunks = self
.chunks
.iter()
.map(|chunk| arrow::compute::cast(&chunk, new_field.data_type()))
.collect::<Result<Vec<_>, ArrowError>>()?;
Ok(PyChunkedArray::try_new(new_chunks, new_field)?.to_arro3(py)?)
Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
}

fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
Expand Down Expand Up @@ -428,7 +428,7 @@ impl PyChunkedArray {

#[pyo3(signature = (*, max_chunksize=None))]
#[pyo3(name = "rechunk")]
fn rechunk_py(&self, py: Python, max_chunksize: Option<usize>) -> PyArrowResult<PyObject> {
fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
let max_chunksize = max_chunksize.unwrap_or(self.len());
let mut chunk_lengths = vec![];
let mut offset = 0;
Expand All @@ -437,20 +437,14 @@ impl PyChunkedArray {
offset += chunk_length;
chunk_lengths.push(chunk_length);
}
Ok(self.rechunk(chunk_lengths)?.to_arro3(py)?)
Ok(self.rechunk(chunk_lengths)?.into())
}

#[pyo3(signature = (offset=0, length=None))]
#[pyo3(name = "slice")]
fn slice_py(
&self,
py: Python,
offset: usize,
length: Option<usize>,
) -> PyArrowResult<PyObject> {
fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
let length = length.unwrap_or_else(|| self.len() - offset);
let sliced_chunked_array = self.slice(offset, length)?;
Ok(sliced_chunked_array.to_arro3(py)?)
Ok(self.slice(offset, length)?.into())
}

fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Expand Down

0 comments on commit 27e3959

Please sign in to comment.