diff --git a/crates/polars-io/src/mmap.rs b/crates/polars-io/src/mmap.rs index 2373257469e7..f7674941b8dc 100644 --- a/crates/polars-io/src/mmap.rs +++ b/crates/polars-io/src/mmap.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::{BufReader, Cursor, Read, Seek}; use polars_core::config::verbose; +use polars_utils::file::ClosableFile; use polars_utils::mmap::MemSlice; /// Trait used to get a hold to file handler or to the underlying bytes @@ -22,6 +23,12 @@ impl MmapBytesReader for File { } } +impl MmapBytesReader for ClosableFile { + fn to_file(&self) -> Option<&File> { + Some(self.as_ref()) + } +} + impl MmapBytesReader for BufReader { fn to_file(&self) -> Option<&File> { Some(self.get_ref()) diff --git a/crates/polars-io/src/utils/file.rs b/crates/polars-io/src/utils/file.rs index 9bf69c3da0f1..a788871fc173 100644 --- a/crates/polars-io/src/utils/file.rs +++ b/crates/polars-io/src/utils/file.rs @@ -1,19 +1,23 @@ -use std::io::Write; use std::path::Path; use polars_core::config; use polars_error::{feature_gated, PolarsResult}; use polars_utils::create_file; +use polars_utils::file::{ClosableFile, WriteClose}; use polars_utils::mmap::ensure_not_mapped; use crate::cloud::CloudOptions; +#[cfg(feature = "cloud")] +use crate::cloud::CloudWriter; use crate::{is_cloud_url, resolve_homedir}; +#[cfg(feature = "cloud")] +impl WriteClose for CloudWriter {} /// Open a path for writing. Supports cloud paths. pub fn try_get_writeable( path: &str, #[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>, -) -> PolarsResult> { +) -> PolarsResult> { let is_cloud = is_cloud_url(path); let verbose = config::verbose(); @@ -80,6 +84,7 @@ pub fn try_get_writeable( ) } - Ok(Box::new(polars_utils::open_file_write(&path)?)) + let f: ClosableFile = polars_utils::open_file_write(&path)?.into(); + Ok(Box::new(f)) } } diff --git a/crates/polars-pipe/src/executors/sinks/output/parquet.rs b/crates/polars-pipe/src/executors/sinks/output/parquet.rs index 7222b34ff153..b99d41bf9df2 100644 --- a/crates/polars-pipe/src/executors/sinks/output/parquet.rs +++ b/crates/polars-pipe/src/executors/sinks/output/parquet.rs @@ -9,6 +9,7 @@ use polars_io::parquet::write::{ BatchedWriter, ParquetWriteOptions, ParquetWriter, RowGroupIterColumns, }; use polars_io::utils::file::try_get_writeable; +use polars_utils::file::WriteClose; use crate::executors::sinks::output::file_sink::SinkWriter; use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult}; @@ -58,7 +59,7 @@ where #[derive(Clone)] pub struct ParquetSink { - writer: Arc>>, + writer: Arc>>, io_thread_handle: Arc>>, sender: Sender>, } diff --git a/crates/polars-python/src/conversion/mod.rs b/crates/polars-python/src/conversion/mod.rs index 4ebc90b11561..7ac793f623ce 100644 --- a/crates/polars-python/src/conversion/mod.rs +++ b/crates/polars-python/src/conversion/mod.rs @@ -570,7 +570,7 @@ impl<'py> FromPyObject<'py> for Wrap { }, PythonScanSourceInput::File(file) => { let mut sources = Vec::with_capacity(num_items); - sources.push(file); + sources.push(file.into()); MutableSources::Files(sources) }, PythonScanSourceInput::Buffer(buffer) => { @@ -583,7 +583,7 @@ impl<'py> FromPyObject<'py> for Wrap { for source in iter { match (&mut sources, source?) { (MutableSources::Paths(v), PythonScanSourceInput::Path(p)) => v.push(p), - (MutableSources::Files(v), PythonScanSourceInput::File(f)) => v.push(f), + (MutableSources::Files(v), PythonScanSourceInput::File(f)) => v.push(f.into()), (MutableSources::Buffers(v), PythonScanSourceInput::Buffer(f)) => v.push(f), _ => { return Err(PyTypeError::new_err( diff --git a/crates/polars-python/src/dataframe/io.rs b/crates/polars-python/src/dataframe/io.rs index 9cbe678211a2..f917489dc637 100644 --- a/crates/polars-python/src/dataframe/io.rs +++ b/crates/polars-python/src/dataframe/io.rs @@ -372,10 +372,10 @@ impl PyDataFrame { #[cfg(not(feature = "cloud"))] let cloud_options = None; - let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; + let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; py.enter_polars(|| { - CsvWriter::new(f) + CsvWriter::new(&mut f) .include_bom(include_bom) .include_header(include_header) .with_separator(separator) @@ -389,7 +389,9 @@ impl PyDataFrame { .with_float_precision(float_precision) .with_null_value(null) .with_quote_style(quote_style.map(|wrap| wrap.0).unwrap_or_default()) - .finish(&mut self.df) + .finish(&mut self.df)?; + + crate::file::close_file(f) }) } @@ -455,44 +457,46 @@ impl PyDataFrame { }); }; - let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; + let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; py.enter_polars(|| { - ParquetWriter::new(BufWriter::new(f)) + ParquetWriter::new(BufWriter::new(&mut f)) .with_compression(compression) .with_statistics(statistics.0) .with_row_group_size(row_group_size) .with_data_page_size(data_page_size) - .finish(&mut self.df) - .map(|_| ()) + .finish(&mut self.df)?; + + crate::file::close_file(f) }) } #[cfg(feature = "json")] - pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> { + pub fn write_json(&mut self, py: Python, py_f: PyObject) -> PyResult<()> { let file = BufWriter::new(get_file_like(py_f, true)?); + py.enter_polars(|| { + // TODO: Cloud support - // TODO: Cloud support - - JsonWriter::new(file) - .with_json_format(JsonFormat::Json) - .finish(&mut self.df) - .map_err(PyPolarsErr::from)?; - Ok(()) + JsonWriter::new(file) + .with_json_format(JsonFormat::Json) + .finish(&mut self.df) + }) } #[cfg(feature = "json")] - pub fn write_ndjson(&mut self, py_f: PyObject) -> PyResult<()> { + pub fn write_ndjson(&mut self, py: Python, py_f: PyObject) -> PyResult<()> { let file = BufWriter::new(get_file_like(py_f, true)?); // TODO: Cloud support - JsonWriter::new(file) - .with_json_format(JsonFormat::JsonLines) - .finish(&mut self.df) - .map_err(PyPolarsErr::from)?; + py.enter_polars(|| { + // TODO: Cloud support - Ok(()) + JsonWriter::new(file) + .with_json_format(JsonFormat::JsonLines) + .finish(&mut self.df) + .map_err(PyPolarsErr::from) + }) } #[cfg(feature = "ipc")] @@ -526,13 +530,15 @@ impl PyDataFrame { #[cfg(not(feature = "cloud"))] let cloud_options = None; - let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; + let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?; py.enter_polars(|| { - IpcWriter::new(f) + IpcWriter::new(&mut f) .with_compression(compression.0) .with_compat_level(compat_level.0) - .finish(&mut self.df) + .finish(&mut self.df)?; + + crate::file::close_file(f) }) } diff --git a/crates/polars-python/src/file.rs b/crates/polars-python/src/file.rs index e48111077619..b15dd06d7970 100644 --- a/crates/polars-python/src/file.rs +++ b/crates/polars-python/src/file.rs @@ -10,9 +10,10 @@ use std::path::PathBuf; use std::sync::Arc; use polars::io::mmap::MmapBytesReader; -use polars_error::polars_err; +use polars_error::{polars_err, PolarsResult}; use polars_io::cloud::CloudOptions; use polars_utils::create_file; +use polars_utils::file::{ClosableFile, WriteClose}; use polars_utils::mmap::MemSlice; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; @@ -22,10 +23,12 @@ use pyo3::IntoPyObjectExt; use crate::error::PyPolarsErr; use crate::prelude::resolve_homedir; -pub struct PyFileLikeObject { +pub(crate) struct PyFileLikeObject { inner: PyObject, } +impl WriteClose for PyFileLikeObject {} + impl Clone for PyFileLikeObject { fn clone(&self) -> Self { Python::with_gil(|py| Self { @@ -39,11 +42,11 @@ impl PyFileLikeObject { /// Creates an instance of a `PyFileLikeObject` from a `PyObject`. /// To assert the object has the required methods, /// instantiate it with `PyFileLikeObject::require` - pub fn new(object: PyObject) -> Self { + pub(crate) fn new(object: PyObject) -> Self { PyFileLikeObject { inner: object } } - pub fn to_memslice(&self) -> MemSlice { + pub(crate) fn to_memslice(&self) -> MemSlice { Python::with_gil(|py| { let bytes = self .inner @@ -70,7 +73,7 @@ impl PyFileLikeObject { /// Validates that the underlying /// python object has a `read`, `write`, and `seek` methods in respect to parameters. /// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods. - pub fn ensure_requirements( + pub(crate) fn ensure_requirements( object: &Bound, read: bool, write: bool, @@ -185,19 +188,20 @@ impl Seek for PyFileLikeObject { } } -pub trait FileLike: Read + Write + Seek + Sync + Send {} +pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {} impl FileLike for File {} +impl FileLike for ClosableFile {} impl FileLike for PyFileLikeObject {} impl MmapBytesReader for PyFileLikeObject {} -pub enum EitherRustPythonFile { +pub(crate) enum EitherRustPythonFile { Py(PyFileLikeObject), - Rust(File), + Rust(ClosableFile), } impl EitherRustPythonFile { - pub fn into_dyn(self) -> Box { + pub(crate) fn into_dyn(self) -> Box { match self { EitherRustPythonFile::Py(f) => Box::new(f), EitherRustPythonFile::Rust(f) => Box::new(f), @@ -211,7 +215,7 @@ impl EitherRustPythonFile { } } - pub fn into_dyn_writeable(self) -> Box { + pub(crate) fn into_dyn_writeable(self) -> Box { match self { EitherRustPythonFile::Py(f) => Box::new(f), EitherRustPythonFile::Rust(f) => Box::new(f), @@ -219,10 +223,10 @@ impl EitherRustPythonFile { } } -pub enum PythonScanSourceInput { +pub(crate) enum PythonScanSourceInput { Buffer(MemSlice), Path(PathBuf), - File(File), + File(ClosableFile), } fn try_get_pyfile( @@ -282,7 +286,7 @@ fn try_get_pyfile( .map(|fileno| fileno as RawFd) { return Ok(( - EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd) }), + EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }), // This works on Linux and BSD with procfs mounted, // otherwise it fails silently. fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(), @@ -317,7 +321,7 @@ fn try_get_pyfile( Ok((EitherRustPythonFile::Py(f), None)) } -pub fn get_python_scan_source_input( +pub(crate) fn get_python_scan_source_input( py_f: PyObject, write: bool, ) -> PyResult { @@ -361,7 +365,7 @@ fn get_either_buffer_or_path( } else { polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)? }; - Ok((EitherRustPythonFile::Rust(f), Some(file_path))) + Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path))) } else { try_get_pyfile(py, py_f, write) } @@ -371,11 +375,11 @@ fn get_either_buffer_or_path( /// /// # Arguments /// * `write` - open for writing; will truncate existing file and create new file if not. -pub fn get_either_file(py_f: PyObject, write: bool) -> PyResult { +pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult { Ok(get_either_buffer_or_path(py_f, write)?.0) } -pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult> { +pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult> { Ok(get_either_file(f, truncate)?.into_dyn()) } @@ -395,11 +399,11 @@ fn read_if_bytesio(py_f: Bound) -> Bound { } /// Create reader from PyBytes or a file-like object. -pub fn get_mmap_bytes_reader(py_f: &Bound) -> PyResult> { +pub(crate) fn get_mmap_bytes_reader(py_f: &Bound) -> PyResult> { get_mmap_bytes_reader_and_path(py_f).map(|t| t.0) } -pub fn get_mmap_bytes_reader_and_path( +pub(crate) fn get_mmap_bytes_reader_and_path( py_f: &Bound, ) -> PyResult<(Box, Option)> { let py_f = read_if_bytesio(py_f.clone()); @@ -425,10 +429,10 @@ pub fn get_mmap_bytes_reader_and_path( } } -pub fn try_get_writeable( +pub(crate) fn try_get_writeable( py_f: PyObject, cloud_options: Option<&CloudOptions>, -) -> PyResult> { +) -> PyResult> { Python::with_gil(|py| { let py_f = py_f.into_bound(py); @@ -441,3 +445,10 @@ pub fn try_get_writeable( } }) } + +pub(crate) fn close_file(f: Box) -> PolarsResult<()> { + f.close().map_err(|e| { + let err: polars_error::PolarsError = e.into(); + err + }) +} diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index f90300e4fd61..06730e61d95f 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -32,7 +32,7 @@ fn pyobject_to_first_path_and_scan_sources( PythonScanSourceInput::Path(path) => { (Some(path.clone()), ScanSources::Paths([path].into())) }, - PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())), + PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())), PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())), }) } diff --git a/crates/polars-utils/src/file.rs b/crates/polars-utils/src/file.rs new file mode 100644 index 000000000000..60271e0a492d --- /dev/null +++ b/crates/polars-utils/src/file.rs @@ -0,0 +1,83 @@ +use std::fs::File; +use std::io::{Read, Seek, Write}; + +impl From for ClosableFile { + fn from(value: File) -> Self { + ClosableFile { inner: value } + } +} + +impl From for File { + fn from(value: ClosableFile) -> Self { + value.inner + } +} + +pub struct ClosableFile { + inner: File, +} + +impl ClosableFile { + #[cfg(unix)] + pub fn close(self) -> std::io::Result<()> { + use std::os::fd::IntoRawFd; + let fd = self.inner.into_raw_fd(); + + match unsafe { libc::close(fd) } { + 0 => Ok(()), + _ => Err(std::io::Error::last_os_error()), + } + } + + #[cfg(not(unix))] + pub fn close(self) -> std::io::Result<()> { + Ok(()) + } +} + +impl AsMut for ClosableFile { + fn as_mut(&mut self) -> &mut File { + &mut self.inner + } +} + +impl AsRef for ClosableFile { + fn as_ref(&self) -> &File { + &self.inner + } +} + +impl Seek for ClosableFile { + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { + self.inner.seek(pos) + } +} + +impl Read for ClosableFile { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.inner.read(buf) + } +} + +impl Write for ClosableFile { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +pub trait WriteClose: Write { + fn close(self: Box) -> std::io::Result<()> { + Ok(()) + } +} + +impl WriteClose for ClosableFile { + fn close(self: Box) -> std::io::Result<()> { + let f = *self; + f.close() + } +} diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index cbb73a4526b6..f0126825a537 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -37,6 +37,7 @@ pub mod sys; pub mod total_ord; pub use functions::*; +pub mod file; pub mod aliases; pub mod fixedringbuffer;