Skip to content

Commit

Permalink
use polars df fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Sep 12, 2024
1 parent 4b9ade1 commit 62a923a
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions py-polars/tests/unit/io/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import polars as pl
from polars.io.iceberg import _convert_predicate, _to_ast
from polars.testing import assert_frame_equal


@pytest.fixture
Expand Down Expand Up @@ -167,36 +168,32 @@ def test_parse_lteq(self) -> None:

@pytest.mark.slow
@pytest.mark.write_disk
@pytest.mark.filterwarnings(
"ignore:No preferred file implementation for scheme*:UserWarning"
)
def test_write_iceberg(tmp_path: Path) -> None:
df = pl.DataFrame(
{
"foo": [1, 2, 3, 4, 5],
"bar": [6, 7, 8, 9, 10],
"ham": ["a", "b", "c", "d", "e"],
}
)

@pytest.mark.filterwarnings("ignore:Delete operation did not match any records")
def test_write_iceberg(df: pl.DataFrame, tmp_path: Path) -> None:
from pyiceberg.catalog.sql import SqlCatalog

# time64[ns] type is currently not supported in pyiceberg.
df = df.drop("time")

# in-memory catalog
catalog = SqlCatalog(
"default", uri="sqlite:///:memory:", warehouse=f"file://{tmp_path}"
)
catalog.create_namespace("default")
catalog.create_namespace("foo")
table = catalog.create_table(
"default.table",
"foo.bar",
schema=df.to_arrow().schema,
)

df.write_iceberg(table, mode="overwrite")
new_df = pl.scan_iceberg(table).collect()
assert df.schema == new_df.schema
assert len(df) == len(new_df)
assert df.equals(new_df)
actual = pl.scan_iceberg(table).collect()

# Enum & Categorical types are not currently supported in pyiceberg.
assert_frame_equal(df, actual, check_dtypes=False)

# append on top of already written data, expecting twice the data
df.write_iceberg(table, mode="append")
new_df = pl.scan_iceberg(table).collect()
assert df.schema == new_df.schema
assert 2 * len(df) == len(new_df)
# double the `df` by vertically stacking the dataframe on top of itself
expected = df.vstack(df)
actual = pl.scan_iceberg(table).collect()
assert_frame_equal(expected, actual, check_dtypes=False)

0 comments on commit 62a923a

Please sign in to comment.