From a14cc6c96abebd498d3e93c52831b241286157c4 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Wed, 24 Jul 2024 00:24:31 -0400 Subject: [PATCH] Wrap basic Parquet --- Cargo.lock | 220 +++++++++++++++++++++++++++++ Cargo.toml | 1 + arro3-io/Cargo.toml | 1 + arro3-io/python/arro3/io/_rust.pyi | 48 ++++++- arro3-io/src/lib.rs | 4 + arro3-io/src/parquet.rs | 37 +++++ 6 files changed, 309 insertions(+), 2 deletions(-) create mode 100644 arro3-io/src/parquet.rs diff --git a/Cargo.lock b/Cargo.lock index 05679d6..36bb505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.8.11" @@ -25,6 +31,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -73,6 +94,7 @@ dependencies = [ "arrow-csv", "arrow-ipc", "arrow-schema", + "parquet", "pyo3", "pyo3-arrow", "pyo3-file", @@ -330,6 +352,27 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -342,6 +385,12 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.6.1" @@ -384,6 +433,10 @@ name = "cc" version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +dependencies = [ + "jobserver", + "libc", +] [[package]] name = "cfg-if" @@ -429,6 +482,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -497,6 +559,16 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "flate2" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -576,12 +648,27 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -695,6 +782,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "matrixmultiply" version = "0.3.8" @@ -720,6 +816,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "ndarray" version = "0.15.6" @@ -829,6 +934,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -852,6 +966,51 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "parquet" +version = "52.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f22ba0d95db56dde8685e3fadcb915cdaadda31ab8abbe3ff7f0ad1ef333267" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64", + "brotli", + "bytes", + "chrono", + "flate2", + "half", + "hashbrown", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "twox-hash", + "zstd", + "zstd-sys", +] + +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + +[[package]] +name = "pkg-config" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" + [[package]] name = "portable-atomic" version = "1.7.0" @@ -1076,6 +1235,12 @@ dependencies = [ "serde", ] +[[package]] +name = "seq-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" + [[package]] name = "serde" version = "1.0.204" @@ -1128,6 +1293,12 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "static_assertions" version = "1.1.0" @@ -1183,6 +1354,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -1192,6 +1374,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "unicase" version = "2.7.0" @@ -1399,3 +1591,31 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa556e971e7b568dc775c136fc9de8c779b1c2fc3a63defaafadffdbd3181afa" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.11+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75652c55c0b6f3e6f12eb786fe1bc960396bf05a1eb3bf1f3691c3610ac2e6d4" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index afe1b7e..8acc12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ arrow-ipc = "52" arrow-schema = "52" arrow-select = "52" numpy = "0.21" +parquet = "52" pyo3 = "0.21" pyo3-file = "0.8.1" thiserror = "1" diff --git a/arro3-io/Cargo.toml b/arro3-io/Cargo.toml index 822b150..00f0a07 100644 --- a/arro3-io/Cargo.toml +++ b/arro3-io/Cargo.toml @@ -23,6 +23,7 @@ arrow-csv = { workspace = true } arrow-ipc = { workspace = true } arrow-schema = { workspace = true } arrow = { workspace = true, features = ["ffi"] } +parquet = { workspace = true } pyo3 = { workspace = true, features = ["abi3-py38"] } pyo3-file = { workspace = true } thiserror = { workspace = true } diff --git a/arro3-io/python/arro3/io/_rust.pyi b/arro3-io/python/arro3/io/_rust.pyi index 0743ed6..4c49e31 100644 --- a/arro3-io/python/arro3/io/_rust.pyi +++ b/arro3-io/python/arro3/io/_rust.pyi @@ -1,7 +1,7 @@ from pathlib import Path from typing import IO, Protocol, Tuple -from arro3.core import Array, ArrayReader, Schema +from arro3.core import RecordBatchReader, Schema class ArrowSchemaExportable(Protocol): def __arrow_c_schema__(self) -> object: ... @@ -14,6 +14,8 @@ class ArrowArrayExportable(Protocol): class ArrowStreamExportable(Protocol): def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: ... +#### CSV + def infer_csv_schema( file: IO[bytes] | Path | str, *, @@ -36,7 +38,7 @@ def read_csv( quote: str | None = None, terminator: str | None = None, comment: str | None = None, -) -> Schema: ... +) -> RecordBatchReader: ... def write_csv( data: ArrowStreamExportable, file: IO[bytes] | Path | str, @@ -52,3 +54,45 @@ def write_csv( timestamp_tz_format: str | None = None, null: str | None = None, ) -> None: ... + +#### JSON + +def infer_json_schema( + file: IO[bytes] | Path | str, + *, + max_records: int | None = None, +) -> Schema: ... +def read_json( + file: IO[bytes] | Path | str, + schema: ArrowSchemaExportable, + *, + batch_size: int | None = None, +) -> RecordBatchReader: ... +def write_json( + data: ArrowStreamExportable, + file: IO[bytes] | Path | str, + *, + explicit_nulls: bool | None = None, +) -> None: ... +def write_ndjson( + data: ArrowStreamExportable, + file: IO[bytes] | Path | str, + *, + explicit_nulls: bool | None = None, +) -> None: ... + +#### IPC + +def read_ipc(file: IO[bytes] | Path | str) -> RecordBatchReader: ... +def read_ipc_stream(file: IO[bytes] | Path | str) -> RecordBatchReader: ... +def write_ipc(data: ArrowStreamExportable, file: IO[bytes] | Path | str) -> None: ... +def write_ipc_stream( + data: ArrowStreamExportable, file: IO[bytes] | Path | str +) -> None: ... + +#### Parquet + +def read_parquet(file: Path | str) -> RecordBatchReader: ... +def write_parquet( + data: ArrowStreamExportable, file: IO[bytes] | Path | str +) -> None: ... diff --git a/arro3-io/src/lib.rs b/arro3-io/src/lib.rs index bbbe291..97d2a83 100644 --- a/arro3-io/src/lib.rs +++ b/arro3-io/src/lib.rs @@ -3,6 +3,7 @@ use pyo3::prelude::*; mod csv; mod ipc; mod json; +mod parquet; mod utils; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -30,5 +31,8 @@ fn _rust(_py: Python, m: &Bound) -> PyResult<()> { m.add_wrapped(wrap_pyfunction!(ipc::write_ipc))?; m.add_wrapped(wrap_pyfunction!(ipc::write_ipc_stream))?; + m.add_wrapped(wrap_pyfunction!(parquet::read_parquet))?; + m.add_wrapped(wrap_pyfunction!(parquet::write_parquet))?; + Ok(()) } diff --git a/arro3-io/src/parquet.rs b/arro3-io/src/parquet.rs new file mode 100644 index 0000000..e165515 --- /dev/null +++ b/arro3-io/src/parquet.rs @@ -0,0 +1,37 @@ +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::ArrowWriter; +use pyo3::exceptions::PyTypeError; +use pyo3::prelude::*; +use pyo3_arrow::error::PyArrowResult; +use pyo3_arrow::PyRecordBatchReader; + +use crate::utils::{FileReader, FileWriter}; + +/// Read a Parquet file to an Arrow RecordBatchReader +#[pyfunction] +#[allow(clippy::too_many_arguments)] +pub fn read_parquet(py: Python, file: FileReader) -> PyArrowResult { + match file { + FileReader::File(f) => { + let builder = ParquetRecordBatchReaderBuilder::try_new(f).unwrap(); + + let reader = builder.build().unwrap(); + Ok(PyRecordBatchReader::new(Box::new(reader)).to_arro3(py)?) + } + FileReader::FileLike(_) => { + Err(PyTypeError::new_err("File objects not yet supported for reading parquet").into()) + } + } +} + +/// Write an Arrow Table or stream to a Parquet file +#[pyfunction] +#[allow(clippy::too_many_arguments)] +pub fn write_parquet(data: PyRecordBatchReader, file: FileWriter) -> PyArrowResult<()> { + let reader = data.into_reader()?; + let mut writer = ArrowWriter::try_new(file, reader.schema(), None).unwrap(); + for batch in reader { + writer.write(&batch?).unwrap(); + } + Ok(()) +}