From a1a93b69d719e72de1c92ce34a04a04e5c802e2a Mon Sep 17 00:00:00 2001 From: alexander-beedie Date: Mon, 25 Sep 2023 22:01:25 +0400 Subject: [PATCH 1/5] feat(python): improve `write_database`, accounting for latest `adbc` fixes/updates --- py-polars/polars/dataframe/frame.py | 87 +++++++++++++------ py-polars/pyproject.toml | 2 +- py-polars/requirements-dev.txt | 1 + .../tests/unit/io/test_database_write.py | 74 +++++++--------- 4 files changed, 96 insertions(+), 68 deletions(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index dff1c47e41d6..f9c0f42761d8 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3434,30 +3434,57 @@ def write_database( Parameters ---------- table_name - Name of the table to create or append to in the target SQL database. - If your table name contains special characters, it should be quoted. + Schema-qualified name of the table to create or append to in the target + SQL database. If your table name contains special characters, it should + be quoted. connection Connection URI string, for example: * "postgresql://user:pass@server:port/database" * "sqlite:////path/to/database.db" if_exists : {'append', 'replace', 'fail'} - The insert mode. - 'replace' will create a new database table, overwriting an existing one. - 'append' will append to an existing table. - 'fail' will fail if table already exists. + The insert mode: + + * 'replace' will create a new database table, overwriting an existing one. + * 'append' will append to an existing table. + * 'fail' will fail if table already exists. engine : {'sqlalchemy', 'adbc'} Select the engine used for writing the data. """ from polars.io.database import _open_adbc_connection + def unpack_table_name(name: str) -> tuple[str | None, str]: + """Unpack optionally qualified table name into schema/table pair.""" + from csv import reader as delimited_read + + table_ident = next(delimited_read([name], delimiter=".")) + if len(table_ident) > 2: + raise ValueError(f"`table_name` appears to be invalid: {name!r}") + elif len(table_ident) > 1: + schema = table_ident[0] + tbl = table_ident[1] + else: + schema = None + tbl = table_ident[0] + return schema, tbl + if engine == "adbc": + import adbc_driver_manager + + adbc_version = parse_version( + getattr(adbc_driver_manager, "__version__", "0.0") + ) if if_exists == "fail": - raise NotImplementedError( - "`if_exists = 'fail'` not supported for ADBC engine" - ) - elif if_exists == "replace": + # if the table exists, 'create' will raise an error, + # resulting in behaviour equivalent to 'fail' mode = "create" + elif if_exists == "replace": + if adbc_version < (0, 7): + adbc_str_version = ".".join(str(v) for v in adbc_version) + raise ModuleNotFoundError( + f"`if_exists = 'replace'` requires ADBC version >= 0.7, found {adbc_str_version}" + ) + mode = "replace" elif if_exists == "append": mode = "append" else: @@ -3465,16 +3492,35 @@ def write_database( f"unexpected value for `if_exists`: {if_exists!r}" f"\n\nChoose one of {{'fail', 'replace', 'append'}}" ) + with _open_adbc_connection(connection) as conn, conn.cursor() as cursor: - cursor.adbc_ingest(table_name, self.to_arrow(), mode) + if adbc_version >= (0, 7): + db_schema, unpacked_table_name = unpack_table_name(table_name) + if "sqlite" in conn.adbc_get_info()["driver_name"].lower(): + if if_exists == "replace": + # note: adbc doesn't (yet) support 'replace' for sqlite + cursor.execute(f"DROP TABLE IF EXISTS {table_name}") + mode = "create" + catalog, db_schema = db_schema, None + else: + catalog = None + + cursor.adbc_ingest( + unpacked_table_name, + data=self.to_arrow(), + mode=mode, + catalog_name=catalog, + db_schema_name=db_schema, + ) + else: + cursor.adbc_ingest(table_name, self.to_arrow(), mode) conn.commit() elif engine == "sqlalchemy": if parse_version(pd.__version__) < parse_version("1.5"): raise ModuleNotFoundError( - f"writing with engine 'sqlalchemy' requires pandas 1.5.x or higher, found pandas {pd.__version__!r}" + f"writing with engine 'sqlalchemy' requires pandas 1.5.x or higher, found {pd.__version__!r}" ) - try: from sqlalchemy import create_engine except ModuleNotFoundError as exc: @@ -3482,23 +3528,12 @@ def write_database( "sqlalchemy not found" "\n\nInstall Polars with: pip install polars[sqlalchemy]" ) from exc - from csv import reader as delimited_read - - # the table name may also include the db schema; ensure that we identify - # both components and pass them through unquoted (sqlalachemy will quote) - table_ident = next(delimited_read([table_name], delimiter=".")) - if len(table_ident) > 2: - raise ValueError(f"`table_name` appears to be invalid: {table_name!r}") - elif len(table_ident) > 1: - db_schema = table_ident[0] - table_name = table_ident[1] - else: - table_name = table_ident[0] - db_schema = None # ensure conversion to pandas uses the pyarrow extension array option # so that we can make use of the sql/db export without copying data engine_sa = create_engine(connection) + db_schema, table_name = unpack_table_name(table_name) + self.to_pandas(use_pyarrow_extension_array=True).to_sql( name=table_name, schema=db_schema, diff --git a/py-polars/pyproject.toml b/py-polars/pyproject.toml index 8c2da790a236..0d45cd839ff2 100644 --- a/py-polars/pyproject.toml +++ b/py-polars/pyproject.toml @@ -79,7 +79,7 @@ disable_error_code = [ [[tool.mypy.overrides]] module = [ "IPython.*", - "adbc_driver_postgresql.*", + "adbc_driver_manager.*", "adbc_driver_sqlite.*", "arrow_odbc", "backports", diff --git a/py-polars/requirements-dev.txt b/py-polars/requirements-dev.txt index d5950b552d05..06fcebfca9d8 100644 --- a/py-polars/requirements-dev.txt +++ b/py-polars/requirements-dev.txt @@ -25,6 +25,7 @@ backports.zoneinfo; python_version < '3.9' tzdata; platform_system == 'Windows' # Database SQLAlchemy +adbc_driver_manager; python_version >= '3.9' and platform_system != 'Windows' adbc_driver_sqlite; python_version >= '3.9' and platform_system != 'Windows' # TODO: Remove version constraint for connectorx when Python 3.12 is supported: # https://github.com/sfu-db/connector-x/issues/527 diff --git a/py-polars/tests/unit/io/test_database_write.py b/py-polars/tests/unit/io/test_database_write.py index 4b19c1c4bb2d..067bfcf3d552 100644 --- a/py-polars/tests/unit/io/test_database_write.py +++ b/py-polars/tests/unit/io/test_database_write.py @@ -1,10 +1,10 @@ from __future__ import annotations import sys -from contextlib import suppress -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import pytest +from adbc_driver_manager import InternalError import polars as pl from polars.testing import assert_frame_equal @@ -15,16 +15,8 @@ from polars.type_aliases import DbWriteEngine -def adbc_sqlite_driver_version(*args: Any, **kwargs: Any) -> str: - with suppress(ModuleNotFoundError): # not available on 3.8/windows - import adbc_driver_sqlite - - return getattr(adbc_driver_sqlite, "__version__", "n/a") - return "n/a" - - @pytest.mark.skipif( - sys.version_info > (3, 11), + sys.version_info >= (3, 12), reason="connectorx cannot be installed on Python 3.12 yet.", ) @pytest.mark.skipif( @@ -43,20 +35,20 @@ def test_write_database_create(engine: DbWriteEngine, tmp_path: Path) -> None: ) tmp_path.mkdir(exist_ok=True) test_db = str(tmp_path / f"test_{engine}.db") + test_db_uri = f"sqlite:///{test_db}" table_name = "test_create" df.write_database( table_name=table_name, - connection=f"sqlite:///{test_db}", - if_exists="replace", + connection=test_db_uri, engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", f"sqlite:///{test_db}") + result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) assert_frame_equal(result, df) @pytest.mark.skipif( - sys.version_info > (3, 11), + sys.version_info >= (3, 12), reason="connectorx cannot be installed on Python 3.12 yet.", ) @pytest.mark.skipif( @@ -65,7 +57,7 @@ def test_write_database_create(engine: DbWriteEngine, tmp_path: Path) -> None: ) @pytest.mark.write_disk() @pytest.mark.parametrize("engine", ["adbc", "sqlalchemy"]) -def test_write_database_append(engine: DbWriteEngine, tmp_path: Path) -> None: +def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> None: df = pl.DataFrame( { "key": ["xx", "yy", "zz"], @@ -76,31 +68,40 @@ def test_write_database_append(engine: DbWriteEngine, tmp_path: Path) -> None: tmp_path.mkdir(exist_ok=True) test_db = str(tmp_path / f"test_{engine}.db") + test_db_uri = f"sqlite:///{test_db}" table_name = "test_append" df.write_database( table_name=table_name, - connection=f"sqlite:///{test_db}", - if_exists="replace", + connection=test_db_uri, engine=engine, ) - ExpectedError = NotImplementedError if engine == "adbc" else ValueError + ExpectedError = InternalError if engine == "adbc" else ValueError with pytest.raises(ExpectedError): df.write_database( table_name=table_name, - connection=f"sqlite:///{test_db}", + connection=test_db_uri, if_exists="fail", engine=engine, ) df.write_database( table_name=table_name, - connection=f"sqlite:///{test_db}", + connection=test_db_uri, + if_exists="replace", + engine=engine, + ) + result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) + assert_frame_equal(result, df) + + df.write_database( + table_name=table_name, + connection=test_db_uri, if_exists="append", engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", f"sqlite:///{test_db}") + result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) assert_frame_equal(result, pl.concat([df, df])) @@ -112,16 +113,11 @@ def test_write_database_append(engine: DbWriteEngine, tmp_path: Path) -> None: @pytest.mark.parametrize( "engine", [ - pytest.param( - "adbc", - marks=pytest.mark.xfail( # see: https://github.com/apache/arrow-adbc/issues/1000 - reason="ADBC SQLite driver has a bug with quoted/qualified table names", - ), - ), + "adbc", pytest.param( "sqlalchemy", marks=pytest.mark.skipif( - sys.version_info > (3, 11), + sys.version_info >= (3, 12), reason="connectorx cannot be installed on Python 3.12 yet.", ), ), @@ -134,17 +130,23 @@ def test_write_database_create_quoted_tablename( tmp_path.mkdir(exist_ok=True) test_db = str(tmp_path / f"test_{engine}.db") + test_db_uri = f"sqlite:///{test_db}" # table name requires quoting, and is qualified with the implicit 'main' schema table_name = 'main."test-append"' df.write_database( table_name=table_name, - connection=f"sqlite:///{test_db}", + connection=test_db_uri, + engine=engine, + ) + df.write_database( + table_name=table_name, + connection=test_db_uri, if_exists="replace", engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", f"sqlite:///{test_db}") + result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) assert_frame_equal(result, df) @@ -159,16 +161,6 @@ def test_write_database_errors() -> None: connection="sqlite:///:memory:", table_name="w.x.y.z", engine="sqlalchemy" ) - with pytest.raises( - NotImplementedError, match="`if_exists = 'fail'` not supported for ADBC engine" - ): - df.write_database( - connection="sqlite:///:memory:", - table_name="test_errs", - if_exists="fail", - engine="adbc", - ) - with pytest.raises(ValueError, match="'do_something' is not valid for if_exists"): df.write_database( connection="sqlite:///:memory:", From 87d7965f93d9f139cd63f3fb5d631f669cdf25ca Mon Sep 17 00:00:00 2001 From: Alexander Beedie Date: Mon, 27 Nov 2023 12:41:26 +0000 Subject: [PATCH 2/5] lint: (windows/py3.8) --- py-polars/polars/dataframe/frame.py | 15 +++++++++++---- py-polars/tests/unit/io/test_database_write.py | 3 ++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index f9c0f42761d8..77ff0ce617ca 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3469,11 +3469,18 @@ def unpack_table_name(name: str) -> tuple[str | None, str]: return schema, tbl if engine == "adbc": - import adbc_driver_manager + try: + import adbc_driver_manager + + adbc_version = parse_version( + getattr(adbc_driver_manager, "__version__", "0.0") + ) + except ModuleNotFoundError as exc: + raise ModuleNotFoundError( + "adbc_driver_manager not found" + "\n\nInstall Polars with: pip install adbc_driver_manager" + ) from exc - adbc_version = parse_version( - getattr(adbc_driver_manager, "__version__", "0.0") - ) if if_exists == "fail": # if the table exists, 'create' will raise an error, # resulting in behaviour equivalent to 'fail' diff --git a/py-polars/tests/unit/io/test_database_write.py b/py-polars/tests/unit/io/test_database_write.py index 067bfcf3d552..aa147555d128 100644 --- a/py-polars/tests/unit/io/test_database_write.py +++ b/py-polars/tests/unit/io/test_database_write.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING import pytest -from adbc_driver_manager import InternalError import polars as pl from polars.testing import assert_frame_equal @@ -58,6 +57,8 @@ def test_write_database_create(engine: DbWriteEngine, tmp_path: Path) -> None: @pytest.mark.write_disk() @pytest.mark.parametrize("engine", ["adbc", "sqlalchemy"]) def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> None: + from adbc_driver_manager import InternalError + df = pl.DataFrame( { "key": ["xx", "yy", "zz"], From eb679b04a5fc1b70c208915c34d2211b78fac858 Mon Sep 17 00:00:00 2001 From: Alexander Beedie Date: Mon, 27 Nov 2023 13:01:39 +0000 Subject: [PATCH 3/5] enable all `write_database` tests to run under py3.12 --- py-polars/polars/io/database.py | 4 +- py-polars/polars/type_aliases.py | 4 +- .../tests/unit/io/test_database_write.py | 50 +++++++++---------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/py-polars/polars/io/database.py b/py-polars/polars/io/database.py index b2bb921b3e16..823535e74f73 100644 --- a/py-polars/polars/io/database.py +++ b/py-polars/polars/io/database.py @@ -190,10 +190,10 @@ def _normalise_cursor(self, conn: ConnectionOrCursor) -> Cursor: if conn.driver == "databricks-sql-python": # type: ignore[union-attr] # take advantage of the raw connection to get arrow integration self.driver_name = "databricks" - return conn.raw_connection().cursor() # type: ignore[union-attr] + return conn.raw_connection().cursor() # type: ignore[union-attr, return-value] else: # sqlalchemy engine; direct use is deprecated, so prefer the connection - return conn.connect() # type: ignore[union-attr] + return conn.connect() # type: ignore[union-attr, return-value] elif hasattr(conn, "cursor"): # connection has a dedicated cursor; prefer over direct execute diff --git a/py-polars/polars/type_aliases.py b/py-polars/polars/type_aliases.py index a1bbb246b1bc..4c2cdace9bc6 100644 --- a/py-polars/polars/type_aliases.py +++ b/py-polars/polars/type_aliases.py @@ -21,6 +21,8 @@ if TYPE_CHECKING: import sys + from sqlalchemy import Engine + from polars import DataFrame, Expr, LazyFrame, Series from polars.datatypes import DataType, DataTypeClass, IntegerType, TemporalType from polars.dependencies import numpy as np @@ -233,4 +235,4 @@ def fetchmany(self, *args: Any, **kwargs: Any) -> Any: """Fetch results in batches.""" -ConnectionOrCursor = Union[BasicConnection, BasicCursor, Cursor] +ConnectionOrCursor = Union[BasicConnection, BasicCursor, Cursor, "Engine"] diff --git a/py-polars/tests/unit/io/test_database_write.py b/py-polars/tests/unit/io/test_database_write.py index aa147555d128..947730095c90 100644 --- a/py-polars/tests/unit/io/test_database_write.py +++ b/py-polars/tests/unit/io/test_database_write.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING import pytest +from sqlalchemy import create_engine import polars as pl from polars.testing import assert_frame_equal @@ -14,10 +15,6 @@ from polars.type_aliases import DbWriteEngine -@pytest.mark.skipif( - sys.version_info >= (3, 12), - reason="connectorx cannot be installed on Python 3.12 yet.", -) @pytest.mark.skipif( sys.version_info < (3, 9) or sys.platform == "win32", reason="adbc_driver_sqlite not available below Python 3.9 / on Windows", @@ -42,14 +39,13 @@ def test_write_database_create(engine: DbWriteEngine, tmp_path: Path) -> None: connection=test_db_uri, engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) + result = pl.read_database( + query=f"SELECT * FROM {table_name}", + connection=create_engine(test_db_uri), + ) assert_frame_equal(result, df) -@pytest.mark.skipif( - sys.version_info >= (3, 12), - reason="connectorx cannot be installed on Python 3.12 yet.", -) @pytest.mark.skipif( sys.version_info < (3, 9) or sys.platform == "win32", reason="adbc_driver_sqlite not available below Python 3.9 / on Windows", @@ -93,7 +89,10 @@ def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> if_exists="replace", engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) + result = pl.read_database( + query=f"SELECT * FROM {table_name}", + connection=create_engine(test_db_uri), + ) assert_frame_equal(result, df) df.write_database( @@ -102,7 +101,10 @@ def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> if_exists="append", engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) + result = pl.read_database( + query=f"SELECT * FROM {table_name}", + connection=create_engine(test_db_uri), + ) assert_frame_equal(result, pl.concat([df, df])) @@ -113,16 +115,7 @@ def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> @pytest.mark.write_disk() @pytest.mark.parametrize( "engine", - [ - "adbc", - pytest.param( - "sqlalchemy", - marks=pytest.mark.skipif( - sys.version_info >= (3, 12), - reason="connectorx cannot be installed on Python 3.12 yet.", - ), - ), - ], + ["adbc", "sqlalchemy"], ) def test_write_database_create_quoted_tablename( engine: DbWriteEngine, tmp_path: Path @@ -134,20 +127,23 @@ def test_write_database_create_quoted_tablename( test_db_uri = f"sqlite:///{test_db}" # table name requires quoting, and is qualified with the implicit 'main' schema - table_name = 'main."test-append"' + qualified_table_name = 'main."test-append"' df.write_database( - table_name=table_name, + table_name=qualified_table_name, connection=test_db_uri, engine=engine, ) df.write_database( - table_name=table_name, + table_name=qualified_table_name, connection=test_db_uri, if_exists="replace", engine=engine, ) - result = pl.read_database_uri(f"SELECT * FROM {table_name}", test_db_uri) + result = pl.read_database( + query=f"SELECT * FROM {qualified_table_name}", + connection=create_engine(test_db_uri), + ) assert_frame_equal(result, df) @@ -159,7 +155,9 @@ def test_write_database_errors() -> None: ValueError, match="`table_name` appears to be invalid: 'w.x.y.z'" ): df.write_database( - connection="sqlite:///:memory:", table_name="w.x.y.z", engine="sqlalchemy" + connection="sqlite:///:memory:", + table_name="w.x.y.z", + engine="sqlalchemy", ) with pytest.raises(ValueError, match="'do_something' is not valid for if_exists"): From a7f8de4710bad4bfb7f8a3fbc5c484fdff573360 Mon Sep 17 00:00:00 2001 From: alexander-beedie Date: Mon, 27 Nov 2023 18:42:02 +0400 Subject: [PATCH 4/5] minor test generalisation --- py-polars/tests/unit/io/test_database_write.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/py-polars/tests/unit/io/test_database_write.py b/py-polars/tests/unit/io/test_database_write.py index 947730095c90..8a5c015c0793 100644 --- a/py-polars/tests/unit/io/test_database_write.py +++ b/py-polars/tests/unit/io/test_database_write.py @@ -53,8 +53,6 @@ def test_write_database_create(engine: DbWriteEngine, tmp_path: Path) -> None: @pytest.mark.write_disk() @pytest.mark.parametrize("engine", ["adbc", "sqlalchemy"]) def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> None: - from adbc_driver_manager import InternalError - df = pl.DataFrame( { "key": ["xx", "yy", "zz"], @@ -66,7 +64,7 @@ def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> tmp_path.mkdir(exist_ok=True) test_db = str(tmp_path / f"test_{engine}.db") test_db_uri = f"sqlite:///{test_db}" - table_name = "test_append" + table_name = f"test_append_{engine}" df.write_database( table_name=table_name, @@ -74,8 +72,7 @@ def test_write_database_append_replace(engine: DbWriteEngine, tmp_path: Path) -> engine=engine, ) - ExpectedError = InternalError if engine == "adbc" else ValueError - with pytest.raises(ExpectedError): + with pytest.raises(Exception): # noqa: B017 df.write_database( table_name=table_name, connection=test_db_uri, @@ -127,7 +124,7 @@ def test_write_database_create_quoted_tablename( test_db_uri = f"sqlite:///{test_db}" # table name requires quoting, and is qualified with the implicit 'main' schema - qualified_table_name = 'main."test-append"' + qualified_table_name = f'main."test-append-{engine}"' df.write_database( table_name=qualified_table_name, From 6fc3005ba38fe2b57dcfd2d731ae9f4514afa9e9 Mon Sep 17 00:00:00 2001 From: Alexander Beedie Date: Tue, 28 Nov 2023 08:21:50 +0000 Subject: [PATCH 5/5] detect/raise on version-specific ADBC issues --- py-polars/polars/dataframe/frame.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 77ff0ce617ca..5a6d7243809f 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3501,8 +3501,8 @@ def unpack_table_name(name: str) -> tuple[str | None, str]: ) with _open_adbc_connection(connection) as conn, conn.cursor() as cursor: + db_schema, unpacked_table_name = unpack_table_name(table_name) if adbc_version >= (0, 7): - db_schema, unpacked_table_name = unpack_table_name(table_name) if "sqlite" in conn.adbc_get_info()["driver_name"].lower(): if if_exists == "replace": # note: adbc doesn't (yet) support 'replace' for sqlite @@ -3519,8 +3519,15 @@ def unpack_table_name(name: str) -> tuple[str | None, str]: catalog_name=catalog, db_schema_name=db_schema, ) + elif db_schema is not None: + adbc_str_version = ".".join(str(v) for v in adbc_version) + raise ModuleNotFoundError( + # https://github.com/apache/arrow-adbc/issues/1000 + # https://github.com/apache/arrow-adbc/issues/1109 + f"use of schema-qualified table names requires ADBC version >= 0.8, found {adbc_str_version}" + ) else: - cursor.adbc_ingest(table_name, self.to_arrow(), mode) + cursor.adbc_ingest(unpacked_table_name, self.to_arrow(), mode) conn.commit() elif engine == "sqlalchemy":