Skip to content

Commit

Permalink
feat: Closeable files on unix (#21588)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 5, 2025
1 parent b79df25 commit 55be31f
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 52 deletions.
7 changes: 7 additions & 0 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};

use polars_core::config::verbose;
use polars_utils::file::ClosableFile;
use polars_utils::mmap::MemSlice;

/// Trait used to get a hold to file handler or to the underlying bytes
Expand All @@ -22,6 +23,12 @@ impl MmapBytesReader for File {
}
}

impl MmapBytesReader for ClosableFile {
fn to_file(&self) -> Option<&File> {
Some(self.as_ref())
}
}

impl MmapBytesReader for BufReader<File> {
fn to_file(&self) -> Option<&File> {
Some(self.get_ref())
Expand Down
11 changes: 8 additions & 3 deletions crates/polars-io/src/utils/file.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use std::io::Write;
use std::path::Path;

use polars_core::config;
use polars_error::{feature_gated, PolarsResult};
use polars_utils::create_file;
use polars_utils::file::{ClosableFile, WriteClose};
use polars_utils::mmap::ensure_not_mapped;

use crate::cloud::CloudOptions;
#[cfg(feature = "cloud")]
use crate::cloud::CloudWriter;
use crate::{is_cloud_url, resolve_homedir};
#[cfg(feature = "cloud")]
impl WriteClose for CloudWriter {}

/// Open a path for writing. Supports cloud paths.
pub fn try_get_writeable(
path: &str,
#[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Box<dyn Write + Send>> {
) -> PolarsResult<Box<dyn WriteClose + Send>> {
let is_cloud = is_cloud_url(path);
let verbose = config::verbose();

Expand Down Expand Up @@ -80,6 +84,7 @@ pub fn try_get_writeable(
)
}

Ok(Box::new(polars_utils::open_file_write(&path)?))
let f: ClosableFile = polars_utils::open_file_write(&path)?.into();
Ok(Box::new(f))
}
}
3 changes: 2 additions & 1 deletion crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_io::parquet::write::{
BatchedWriter, ParquetWriteOptions, ParquetWriter, RowGroupIterColumns,
};
use polars_io::utils::file::try_get_writeable;
use polars_utils::file::WriteClose;

use crate::executors::sinks::output::file_sink::SinkWriter;
use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};
Expand Down Expand Up @@ -58,7 +59,7 @@ where

#[derive(Clone)]
pub struct ParquetSink {
writer: Arc<BatchedWriter<Box<dyn std::io::Write + Send + 'static>>>,
writer: Arc<BatchedWriter<Box<dyn WriteClose + Send + 'static>>>,
io_thread_handle: Arc<Option<JoinHandle<()>>>,
sender: Sender<Option<(IdxSize, RowGroups)>>,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-python/src/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ impl<'py> FromPyObject<'py> for Wrap<ScanSources> {
},
PythonScanSourceInput::File(file) => {
let mut sources = Vec::with_capacity(num_items);
sources.push(file);
sources.push(file.into());
MutableSources::Files(sources)
},
PythonScanSourceInput::Buffer(buffer) => {
Expand All @@ -583,7 +583,7 @@ impl<'py> FromPyObject<'py> for Wrap<ScanSources> {
for source in iter {
match (&mut sources, source?) {
(MutableSources::Paths(v), PythonScanSourceInput::Path(p)) => v.push(p),
(MutableSources::Files(v), PythonScanSourceInput::File(f)) => v.push(f),
(MutableSources::Files(v), PythonScanSourceInput::File(f)) => v.push(f.into()),
(MutableSources::Buffers(v), PythonScanSourceInput::Buffer(f)) => v.push(f),
_ => {
return Err(PyTypeError::new_err(
Expand Down
54 changes: 30 additions & 24 deletions crates/polars-python/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,10 @@ impl PyDataFrame {
#[cfg(not(feature = "cloud"))]
let cloud_options = None;

let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;

py.enter_polars(|| {
CsvWriter::new(f)
CsvWriter::new(&mut f)
.include_bom(include_bom)
.include_header(include_header)
.with_separator(separator)
Expand All @@ -389,7 +389,9 @@ impl PyDataFrame {
.with_float_precision(float_precision)
.with_null_value(null)
.with_quote_style(quote_style.map(|wrap| wrap.0).unwrap_or_default())
.finish(&mut self.df)
.finish(&mut self.df)?;

crate::file::close_file(f)
})
}

Expand Down Expand Up @@ -455,44 +457,46 @@ impl PyDataFrame {
});
};

let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;

py.enter_polars(|| {
ParquetWriter::new(BufWriter::new(f))
ParquetWriter::new(BufWriter::new(&mut f))
.with_compression(compression)
.with_statistics(statistics.0)
.with_row_group_size(row_group_size)
.with_data_page_size(data_page_size)
.finish(&mut self.df)
.map(|_| ())
.finish(&mut self.df)?;

crate::file::close_file(f)
})
}

#[cfg(feature = "json")]
pub fn write_json(&mut self, py_f: PyObject) -> PyResult<()> {
pub fn write_json(&mut self, py: Python, py_f: PyObject) -> PyResult<()> {
let file = BufWriter::new(get_file_like(py_f, true)?);
py.enter_polars(|| {
// TODO: Cloud support

// TODO: Cloud support

JsonWriter::new(file)
.with_json_format(JsonFormat::Json)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
Ok(())
JsonWriter::new(file)
.with_json_format(JsonFormat::Json)
.finish(&mut self.df)
})
}

#[cfg(feature = "json")]
pub fn write_ndjson(&mut self, py_f: PyObject) -> PyResult<()> {
pub fn write_ndjson(&mut self, py: Python, py_f: PyObject) -> PyResult<()> {
let file = BufWriter::new(get_file_like(py_f, true)?);

// TODO: Cloud support

JsonWriter::new(file)
.with_json_format(JsonFormat::JsonLines)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
py.enter_polars(|| {
// TODO: Cloud support

Ok(())
JsonWriter::new(file)
.with_json_format(JsonFormat::JsonLines)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)
})
}

#[cfg(feature = "ipc")]
Expand Down Expand Up @@ -526,13 +530,15 @@ impl PyDataFrame {
#[cfg(not(feature = "cloud"))]
let cloud_options = None;

let f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;
let mut f = crate::file::try_get_writeable(py_f, cloud_options.as_ref())?;

py.enter_polars(|| {
IpcWriter::new(f)
IpcWriter::new(&mut f)
.with_compression(compression.0)
.with_compat_level(compat_level.0)
.finish(&mut self.df)
.finish(&mut self.df)?;

crate::file::close_file(f)
})
}

Expand Down
53 changes: 32 additions & 21 deletions crates/polars-python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use polars::io::mmap::MmapBytesReader;
use polars_error::polars_err;
use polars_error::{polars_err, PolarsResult};
use polars_io::cloud::CloudOptions;
use polars_utils::create_file;
use polars_utils::file::{ClosableFile, WriteClose};
use polars_utils::mmap::MemSlice;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
Expand All @@ -22,10 +23,12 @@ use pyo3::IntoPyObjectExt;
use crate::error::PyPolarsErr;
use crate::prelude::resolve_homedir;

pub struct PyFileLikeObject {
pub(crate) struct PyFileLikeObject {
inner: PyObject,
}

impl WriteClose for PyFileLikeObject {}

impl Clone for PyFileLikeObject {
fn clone(&self) -> Self {
Python::with_gil(|py| Self {
Expand All @@ -39,11 +42,11 @@ impl PyFileLikeObject {
/// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
/// To assert the object has the required methods,
/// instantiate it with `PyFileLikeObject::require`
pub fn new(object: PyObject) -> Self {
pub(crate) fn new(object: PyObject) -> Self {
PyFileLikeObject { inner: object }
}

pub fn to_memslice(&self) -> MemSlice {
pub(crate) fn to_memslice(&self) -> MemSlice {
Python::with_gil(|py| {
let bytes = self
.inner
Expand All @@ -70,7 +73,7 @@ impl PyFileLikeObject {
/// Validates that the underlying
/// python object has a `read`, `write`, and `seek` methods in respect to parameters.
/// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
pub fn ensure_requirements(
pub(crate) fn ensure_requirements(
object: &Bound<PyAny>,
read: bool,
write: bool,
Expand Down Expand Up @@ -185,19 +188,20 @@ impl Seek for PyFileLikeObject {
}
}

pub trait FileLike: Read + Write + Seek + Sync + Send {}
pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}

impl FileLike for File {}
impl FileLike for ClosableFile {}
impl FileLike for PyFileLikeObject {}
impl MmapBytesReader for PyFileLikeObject {}

pub enum EitherRustPythonFile {
pub(crate) enum EitherRustPythonFile {
Py(PyFileLikeObject),
Rust(File),
Rust(ClosableFile),
}

impl EitherRustPythonFile {
pub fn into_dyn(self) -> Box<dyn FileLike> {
pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
match self {
EitherRustPythonFile::Py(f) => Box::new(f),
EitherRustPythonFile::Rust(f) => Box::new(f),
Expand All @@ -211,18 +215,18 @@ impl EitherRustPythonFile {
}
}

pub fn into_dyn_writeable(self) -> Box<dyn Write + Send> {
pub(crate) fn into_dyn_writeable(self) -> Box<dyn WriteClose + Send> {
match self {
EitherRustPythonFile::Py(f) => Box::new(f),
EitherRustPythonFile::Rust(f) => Box::new(f),
}
}
}

pub enum PythonScanSourceInput {
pub(crate) enum PythonScanSourceInput {
Buffer(MemSlice),
Path(PathBuf),
File(File),
File(ClosableFile),
}

fn try_get_pyfile(
Expand Down Expand Up @@ -282,7 +286,7 @@ fn try_get_pyfile(
.map(|fileno| fileno as RawFd)
{
return Ok((
EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd) }),
EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
// This works on Linux and BSD with procfs mounted,
// otherwise it fails silently.
fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
Expand Down Expand Up @@ -317,7 +321,7 @@ fn try_get_pyfile(
Ok((EitherRustPythonFile::Py(f), None))
}

pub fn get_python_scan_source_input(
pub(crate) fn get_python_scan_source_input(
py_f: PyObject,
write: bool,
) -> PyResult<PythonScanSourceInput> {
Expand Down Expand Up @@ -361,7 +365,7 @@ fn get_either_buffer_or_path(
} else {
polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
};
Ok((EitherRustPythonFile::Rust(f), Some(file_path)))
Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
} else {
try_get_pyfile(py, py_f, write)
}
Expand All @@ -371,11 +375,11 @@ fn get_either_buffer_or_path(
///
/// # Arguments
/// * `write` - open for writing; will truncate existing file and create new file if not.
pub fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
Ok(get_either_buffer_or_path(py_f, write)?.0)
}

pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
Ok(get_either_file(f, truncate)?.into_dyn())
}

Expand All @@ -395,11 +399,11 @@ fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
}

/// Create reader from PyBytes or a file-like object.
pub fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
}

pub fn get_mmap_bytes_reader_and_path(
pub(crate) fn get_mmap_bytes_reader_and_path(
py_f: &Bound<PyAny>,
) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
let py_f = read_if_bytesio(py_f.clone());
Expand All @@ -425,10 +429,10 @@ pub fn get_mmap_bytes_reader_and_path(
}
}

pub fn try_get_writeable(
pub(crate) fn try_get_writeable(
py_f: PyObject,
cloud_options: Option<&CloudOptions>,
) -> PyResult<Box<dyn Write + Send>> {
) -> PyResult<Box<dyn WriteClose + Send>> {
Python::with_gil(|py| {
let py_f = py_f.into_bound(py);

Expand All @@ -441,3 +445,10 @@ pub fn try_get_writeable(
}
})
}

pub(crate) fn close_file(f: Box<dyn WriteClose>) -> PolarsResult<()> {
f.close().map_err(|e| {
let err: polars_error::PolarsError = e.into();
err
})
}
2 changes: 1 addition & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn pyobject_to_first_path_and_scan_sources(
PythonScanSourceInput::Path(path) => {
(Some(path.clone()), ScanSources::Paths([path].into()))
},
PythonScanSourceInput::File(file) => (None, ScanSources::Files([file].into())),
PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
})
}
Expand Down
Loading

0 comments on commit 55be31f

Please sign in to comment.