Skip to content

Commit

Permalink
[FEAT] Add Flag to_arrow to convert large string arrays (#1283)
Browse files Browse the repository at this point in the history
* add flag in `to_arrow` that convert large string and binary arrays to
their int32 offset counterparts
  • Loading branch information
samster25 authored Aug 18, 2023
1 parent 9e4e20f commit 857162c
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def slice(self, start: int, end: int) -> Table:
# Exporting methods
###

def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False, convert_large_arrays: bool = False) -> pa.Table:
python_fields = set()
tensor_fields = set()
for field in self.schema():
Expand All @@ -181,9 +181,18 @@ def to_arrow(self, cast_tensors_to_ray_tensor_dtype: bool = False) -> pa.Table:
column = column_series.to_arrow(cast_tensors_to_ray_tensor_dtype)
table[colname] = column

return pa.Table.from_pydict(table)
tab = pa.Table.from_pydict(table)
else:
return pa.Table.from_batches([self._table.to_arrow_record_batch()])
tab = pa.Table.from_batches([self._table.to_arrow_record_batch()])

if not convert_large_arrays:
return tab

new_columns = []
for col in tab.columns:
new_columns.append(_trim_pyarrow_large_arrays(col))

return pa.Table.from_arrays(new_columns, names=tab.column_names)

def to_pydict(self) -> dict[str, list]:
return {colname: self.get_column(colname).to_pylist() for colname in self.column_names()}
Expand Down Expand Up @@ -403,3 +412,28 @@ def read_parquet_statistics(
paths = Series.from_pylist(paths, name="uris")
assert paths.name() == "uris", f"Expected input series to have name 'uris', but found: {paths.name()}"
return Table._from_pytable(_read_parquet_statistics(uris=paths._series, io_config=io_config))


def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray:

if pa.types.is_large_binary(arr.type) or pa.types.is_large_string(arr.type):
if pa.types.is_large_binary(arr.type):
target_type = pa.binary()
else:
target_type = pa.string()

all_chunks = []
for chunk in arr.chunks:
if len(chunk) == 0:
continue
offsets = np.frombuffer(chunk.buffers()[1], dtype=np.int64)
if offsets[-1] < 2**31:
all_chunks.append(chunk.cast(target_type))
else:
raise ValueError(
f"Can not convert {arr.type} into {target_type} due to the offset array being too large: {offsets[-1]}. Maximum: {2**31}"
)

return pa.chunked_array(all_chunks, type=target_type)
else:
return arr

0 comments on commit 857162c

Please sign in to comment.