Skip to content

Commit

Permalink
pantab 4.0 with C++/nanobind and friends
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd committed Jan 8, 2024
1 parent f91465c commit 734c575
Show file tree
Hide file tree
Showing 24 changed files with 850 additions and 1,773 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.mypy_cache
*.hyper
hyper_db*
compile_commands.json

#########################################
# Editor temporary/working/backup files #
Expand Down
35 changes: 35 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
project(pantab)

cmake_minimum_required(VERSION 3.18)
find_package(Python COMPONENTS Interpreter Development.Module NumPy REQUIRED)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++20 -Wall")

if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build." FORCE)
set_property(CACHE CMAKE_BUILD_TYPE PROPERTY STRINGS "Debug" "Release" "MinSizeRel" "RelWithDebInfo")
endif()

# Detect the installed nanobind package and import it into CMake
execute_process(
COMMAND "${Python_EXECUTABLE}" -m nanobind --cmake_dir
OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE NB_DIR)
list(APPEND CMAKE_PREFIX_PATH "${NB_DIR}")
find_package(nanobind CONFIG REQUIRED)

include(FetchContent)
FetchContent_Declare(
tableauhyperapi-cxx
FIND_PACKAGE_ARGS NAMES Tableau
URL "https://downloads.tableau.com/tssoftware/tableauhyperapi-cxx-linux-x86_64-release-main.0.0.18369.r86e960ca.zip"
)

FetchContent_MakeAvailable(tableauhyperapi-cxx)
list(APPEND CMAKE_PREFIX_PATH "${tableauhyperapi-cxx_SOURCE_DIR}/share/cmake")
find_package(tableauhyperapi-cxx CONFIG REQUIRED)

if (PANTAB_USE_SANITIZERS)
add_compile_options(-fsanitize=address -fsanitize=undefined)
add_link_options(-fsanitize=address -fsanitize=undefined)
endif()

add_subdirectory(pantab/src)
3 changes: 2 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ dependencies:
- black
- flake8
- isort
- meson-python
- mypy
- nanobind
- pandas
- pip
- pyarrow
- python
- pytest
- scikit-build-core
- sphinx
- pre-commit
- sphinx_rtd_theme
Expand Down
113 changes: 1 addition & 112 deletions pantab/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
__version__ = "3.0.3"
__version__ = "4.0.0"

import libpantab # type: ignore
from tableauhyperapi import __version__ as hyperapi_version

from ._reader import frame_from_hyper, frame_from_hyper_query, frames_from_hyper
from ._tester import test
Expand All @@ -16,112 +14,3 @@
"frames_to_hyper",
"test",
]

# We link against HyperAPI in a fun way: In Python, we extract the function
# pointers directly from the Python HyperAPI. We pass those function pointers
# over to the C module which will then use those pointers to directly interact
# with HyperAPI. Furthermore, we check the function signatures to guard
# against API-breaking changes in HyperAPI.
#
# Directly using HyperAPI's C functions always was and still is discouraged and
# unsupported by Tableu. In particular, Tableau will not be able to provide
# official support for this hack.
#
# Because this is highly brittle, we try to make the error message as
# actionable as possible and guide users in the right direction.

api_incompatibility_msg = """
pantab is incompatible with version {} of Tableau Hyper API. Please upgrade
both `tableauhyperapi` and `pantab` to the latest version. See also
https://pantab.readthedocs.io/en/latest/caveats.html#tableauhyperapi-compatability
""".format(
hyperapi_version
)

try:
from tableauhyperapi.impl.dll import ffi, lib
except ImportError as e:
raise NotImplementedError(api_incompatibility_msg) from e


def _check_compatibility(check, message):
if not check:
raise NotImplementedError(message + "\n" + api_incompatibility_msg)


def _get_hapi_function(name, sig):
_check_compatibility(hasattr(lib, name), f"function '{name}' missing")
f = getattr(lib, name)
func_type = ffi.typeof(f)
_check_compatibility(
func_type.kind == "function",
f"expected '{name}' to be a function, got {func_type.kind}",
)
_check_compatibility(
func_type.cname == sig,
f"expected '{name}' to have the signature '{sig}', got '{func_type.cname}'",
)
return f


libpantab.load_hapi_functions(
_get_hapi_function("hyper_decode_date", "hyper_date_components_t(*)(uint32_t)"),
_get_hapi_function("hyper_encode_date", "uint32_t(*)(hyper_date_components_t)"),
_get_hapi_function("hyper_decode_time", "hyper_time_components_t(*)(uint64_t)"),
_get_hapi_function("hyper_encode_time", "uint64_t(*)(hyper_time_components_t)"),
_get_hapi_function(
"hyper_inserter_buffer_add_null",
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *)",
),
_get_hapi_function(
"hyper_inserter_buffer_add_bool",
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *, _Bool)",
),
_get_hapi_function(
"hyper_inserter_buffer_add_int16",
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *, int16_t)",
),
_get_hapi_function(
"hyper_inserter_buffer_add_int32",
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *, int32_t)",
),
_get_hapi_function(
"hyper_inserter_buffer_add_int64",
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *, int64_t)",
),
_get_hapi_function(
"hyper_inserter_buffer_add_double",
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *, double)",
),
_get_hapi_function(
"hyper_inserter_buffer_add_binary",
(
"struct hyper_error_t *(*)"
"(struct hyper_inserter_buffer_t *, uint8_t *, size_t)"
),
),
_get_hapi_function(
"hyper_inserter_buffer_add_raw",
(
"struct hyper_error_t *(*)(struct hyper_inserter_buffer_t *"
", uint8_t *, size_t)"
),
),
_get_hapi_function(
"hyper_rowset_get_next_chunk",
(
"struct hyper_error_t *(*)(struct hyper_rowset_t *"
", struct hyper_rowset_chunk_t * *)"
),
),
_get_hapi_function(
"hyper_destroy_rowset_chunk", "void(*)(struct hyper_rowset_chunk_t *)"
),
_get_hapi_function(
"hyper_rowset_chunk_field_values",
(
"void(*)(struct hyper_rowset_chunk_t *"
", size_t *, size_t *, uint8_t * * *, size_t * *)"
),
),
)
161 changes: 37 additions & 124 deletions pantab/_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,147 +3,72 @@
import tempfile
from typing import Dict, Optional, Union

import libpantab # type: ignore
import numpy as np
import pandas as pd
import tableauhyperapi as tab_api

import pantab._types as pantab_types
from pantab._hyper_util import ensure_hyper_process, forbid_hyper_process
import pantab.src.pantab as libpantab # type: ignore

TableType = Union[str, tab_api.Name, tab_api.TableName]


def _read_query_result(
result: tab_api.Result, dtypes: Optional[Dict[str, str]], use_float_na: bool
) -> pd.DataFrame:
if dtypes is None:
dtypes = {}
# Construct data types from result
for column in result.schema.columns:
# `result.schema` does not provide nullability information.
# Lwt's err on the safe side and always assume they are nullable
nullability = tab_api.Nullability.NULLABLE
column_type = pantab_types._ColumnType(column.type, nullability)
try:
dtypes[column.name.unescaped] = pantab_types._get_pandas_type(
column_type
)
except KeyError as e:
raise TypeError(
f"Column {column.name} has unsupported datatype {column.type} "
f"with nullability {column.nullability}"
) from e

# if the use_float_na flag is set to False
# then switch Float32/Float64 dtypes back to float32/float64
# to support np.nan rather than pd.NA
if not use_float_na:
for column, col_type in dtypes.items():
if col_type == "Float64":
dtypes[column] = "float64"
elif col_type == "Float32":
dtypes[column] = "float32"

def _read_query(path, query) -> pd.DataFrame:
# Call native library to read tuples from result set
dtype_strs = tuple(dtypes.values())
df = pd.DataFrame(libpantab.read_hyper_query(result._Result__cdata, dtype_strs))
if df.empty:
return pd.DataFrame({col: pd.Series(dtype="object") for col in dtypes})
df.columns = dtypes.keys()
# TODO: remove this hackery...
for k, v in dtypes.items():
if v == "date":
dtypes[k] = "datetime64[ns]"
date_types = ["datetime64[ns, UTC]", "datetime64[ns]"]
for col in df.select_dtypes(include=date_types):
df[col] = df[col].dt.tz_localize(None)
for col in df.select_dtypes(exclude=date_types):
df[col] = df[col].astype(dtypes[col])

df = df.fillna(value=np.nan) # Replace any appearances of None
df = pd.DataFrame(libpantab.read_from_hyper_query(path, query))
data, columns, dtypes = libpantab.read_from_hyper_query(str(path), query)
df = pd.DataFrame(data, columns=columns)
dtype_map = {k: v for k, v in zip(columns, dtypes)}
df = df.astype(dtype_map)

return df


def _read_table(
*, connection: tab_api.Connection, table: TableType, use_float_na: bool
) -> pd.DataFrame:
if isinstance(table, str):
table = tab_api.TableName(table)
def _read_table(*, path, table: TableType) -> pd.DataFrame:
if isinstance(table, (str, tab_api.Name)) or not table.schema_name:
table = tab_api.TableName("public", table)

table_def = connection.catalog.get_table_definition(table)
columns = table_def.columns
data, columns, dtypes = libpantab.read_from_hyper_table(
str(path),
table.schema_name.name.unescaped, # TODO: this probably allows injection
table.name.unescaped,
)
df = pd.DataFrame(data, columns=columns)
dtype_map = {k: v for k, v in zip(columns, dtypes)}
df = df.astype(dtype_map)

dtypes: Dict[str, str] = {}
for column in columns:
column_type = pantab_types._ColumnType(column.type, column.nullability)
try:
dtypes[column.name.unescaped] = pantab_types._get_pandas_type(column_type)
except KeyError as e:
raise TypeError(
f"Column {column.name} has unsupported datatype {column.type} "
f"with nullability {column.nullability}"
) from e

query = f"SELECT * from {table}"
with connection.execute_query(query) as result:
return _read_query_result(result, dtypes, use_float_na)
return df


def frame_from_hyper(
source: Union[str, pathlib.Path, tab_api.Connection],
*,
table: TableType,
hyper_process: Optional[tab_api.HyperProcess] = None,
use_float_na: bool = False,
) -> pd.DataFrame:
"""See api.rst for documentation"""

if isinstance(source, tab_api.Connection):
forbid_hyper_process(hyper_process)
return _read_table(connection=source, table=table, use_float_na=use_float_na)
else:
with tempfile.TemporaryDirectory() as tmp_dir, ensure_hyper_process(
hyper_process
) as hpe:
tmp_db = shutil.copy(source, tmp_dir)
with tab_api.Connection(hpe.endpoint, tmp_db) as connection:
return _read_table(
connection=connection, table=table, use_float_na=use_float_na
)
return _read_table(path=source, table=table)


def frames_from_hyper(
source: Union[str, pathlib.Path, tab_api.Connection],
*,
hyper_process: Optional[tab_api.HyperProcess] = None,
use_float_na: bool = False,
source: Union[str, pathlib.Path],
) -> Dict[tab_api.TableName, pd.DataFrame]:
"""See api.rst for documentation."""
result: Dict[TableType, pd.DataFrame] = {}

if isinstance(source, tab_api.Connection):
forbid_hyper_process(hyper_process)
connection = source
for schema in connection.catalog.get_schema_names():
for table in connection.catalog.get_table_names(schema=schema):
result[table] = _read_table(
connection=connection, table=table, use_float_na=use_float_na
)
else:
with tempfile.TemporaryDirectory() as tmp_dir, ensure_hyper_process(
hyper_process
) as hpe:
tmp_db = shutil.copy(source, tmp_dir)
with tab_api.Connection(hpe.endpoint, tmp_db) as connection:
for schema in connection.catalog.get_schema_names():
for table in connection.catalog.get_table_names(schema=schema):
result[table] = _read_table(
connection=connection,
table=table,
use_float_na=use_float_na,
)
table_names = []
with tempfile.TemporaryDirectory() as tmp_dir, tab_api.HyperProcess(
tab_api.Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU
) as hpe:
tmp_db = shutil.copy(source, tmp_dir)
with tab_api.Connection(hpe.endpoint, tmp_db) as connection:
for schema in connection.catalog.get_schema_names():
for table in connection.catalog.get_table_names(schema=schema):
table_names.append(table)

for table in table_names:
result[table] = _read_table(
path=source,
table=table,
)

return result

Expand All @@ -153,19 +78,7 @@ def frame_from_hyper_query(
query: str,
*,
hyper_process: Optional[tab_api.HyperProcess] = None,
use_float_na: bool = False,
) -> pd.DataFrame:
"""See api.rst for documentation."""

if isinstance(source, tab_api.Connection):
forbid_hyper_process(hyper_process)
with source.execute_query(query) as result:
return _read_query_result(result, None, use_float_na)
else:
with tempfile.TemporaryDirectory() as tmp_dir, ensure_hyper_process(
hyper_process
) as hpe:
tmp_db = shutil.copy(source, tmp_dir)
with tab_api.Connection(hpe.endpoint, tmp_db) as connection:
with connection.execute_query(query) as result:
return _read_query_result(result, None, use_float_na)
return _read_query(str(source), query)
Loading

0 comments on commit 734c575

Please sign in to comment.