diff --git a/README.md b/README.md index 3c5fd96..ac59628 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ ----------------- +> **Info**: NanoCube is under active development and subject to change. +> This current version, although working nicely, should be seen as a first proof of concept. + + **NanoCube** is a minimalistic in-memory, in-process OLAP engine for lightning fast point queries on Pandas DataFrames. NanoCube shines when filtering and/or point queries need to be executed on a DataFrame, e.g. for financial data analysis, business intelligence or fast web services. diff --git a/benchmarks/benchmark_all.py b/benchmarks/benchmark_all.py index 7c592b9..e4afddf 100644 --- a/benchmarks/benchmark_all.py +++ b/benchmarks/benchmark_all.py @@ -418,7 +418,7 @@ def create_query_chart(self, data, size="s", title="Small", subtitle_postfix="") if __name__ == "__main__": # run the benchmark # - rows = 10_000_000 + rows = 54_000_000 Benchmark(max_rows=rows, sorted=False).run() Benchmark(max_rows=rows, sorted=True).run() diff --git a/benchmarks/charts/hk.png b/benchmarks/charts/hk.png index ea9e668..a7748f9 100644 Binary files a/benchmarks/charts/hk.png and b/benchmarks/charts/hk.png differ diff --git a/benchmarks/charts/hk_sorted.png b/benchmarks/charts/hk_sorted.png index 364b0ed..5fad9a1 100644 Binary files a/benchmarks/charts/hk_sorted.png and b/benchmarks/charts/hk_sorted.png differ diff --git a/benchmarks/charts/l.png b/benchmarks/charts/l.png index 2ce7876..d3ec370 100644 Binary files a/benchmarks/charts/l.png and b/benchmarks/charts/l.png differ diff --git a/benchmarks/charts/l_sorted.png b/benchmarks/charts/l_sorted.png index 0986402..70e6d94 100644 Binary files a/benchmarks/charts/l_sorted.png and b/benchmarks/charts/l_sorted.png differ diff --git a/benchmarks/charts/m.png b/benchmarks/charts/m.png index 3321478..83cca18 100644 Binary files a/benchmarks/charts/m.png and b/benchmarks/charts/m.png differ diff --git a/benchmarks/charts/m_sorted.png b/benchmarks/charts/m_sorted.png index d13f1c8..3c8974b 100644 Binary files a/benchmarks/charts/m_sorted.png and b/benchmarks/charts/m_sorted.png differ diff --git a/benchmarks/charts/s.png b/benchmarks/charts/s.png index 95d9c53..a203e11 100644 Binary files a/benchmarks/charts/s.png and b/benchmarks/charts/s.png differ diff --git a/benchmarks/charts/s_sorted.png b/benchmarks/charts/s_sorted.png index 1d256ab..67a2768 100644 Binary files a/benchmarks/charts/s_sorted.png and b/benchmarks/charts/s_sorted.png differ diff --git a/benchmarks/charts/xl.png b/benchmarks/charts/xl.png index 25960dc..921dc46 100644 Binary files a/benchmarks/charts/xl.png and b/benchmarks/charts/xl.png differ diff --git a/benchmarks/charts/xl_sorted.png b/benchmarks/charts/xl_sorted.png index 804c3bd..f175d71 100644 Binary files a/benchmarks/charts/xl_sorted.png and b/benchmarks/charts/xl_sorted.png differ diff --git a/libs/sortednp-main.zip b/libs/sortednp-main.zip deleted file mode 100644 index 0a04e76..0000000 Binary files a/libs/sortednp-main.zip and /dev/null differ diff --git a/nanocube_old/__init__.py b/nanocube_old/__init__.py deleted file mode 100644 index abf38d1..0000000 --- a/nanocube_old/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# nanocube - Copyright (c)2024, Thomas Zeutschler, MIT license -from nanocube.schema import Schema -from nanocube.nano_index import NanoIndex -from nanocube.nano_cube import NanoCube - -__author__ = "Thomas Zeutschler" -__version__ = "0.2.1" -__license__ = "MIT" -VERSION = __version__ - -__all__ = [ - "NanoCube", - "Schema", - "NanoIndex", -] \ No newline at end of file diff --git a/nanocube_old/nano_cube.py b/nanocube_old/nano_cube.py deleted file mode 100644 index e19c931..0000000 --- a/nanocube_old/nano_cube.py +++ /dev/null @@ -1,226 +0,0 @@ -# nanocube - Copyright (c)2024, Thomas Zeutschler, MIT license -import json -from array import array -import numpy as np -import pandas as pd -from pandas.api.types import is_numeric_dtype, is_bool_dtype, is_float_dtype -from pyroaring import BitMap - -from nanocube.schema import Schema -from nanocube.nano_index import NanoIndex, NanoRoaringIndex, NanoNumpyIndex, IndexingMethod -import lz4.frame -import zstandard as zstd - - -import pyarrow as pa -import pyarrow.parquet as pq - - -class NanoCube: - def __init__(self, df: pd.DataFrame, dimensions: list | None = None, measures:list | None = None, - caching: bool = True, indexing_method: IndexingMethod | str = IndexingMethod.roaring): - """ - Initialize an in-memory OLAP cube for fast point queries upon a Pandas DataFrame. - By default, all non-numeric columns will be used as dimensions and all numeric columns as measures if - not specified otherwise. All column names need to be Python-keyword-compliant. Roaring Bitmaps - (https://roaringbitmap.org) are used to store and query records, Numpy is used for aggregations. - - Parameters - ---------- - df : pd.DataFrame - The DataFrame to be converted to a Cube. - dimensions : list | None - (optional) List of column names from the Pandas DataFrame to be used as dimensions. - measures : list | None - (optional) List of columns names from the Pandas DataFrame to be used as measures. - caching : bool - (optional) If True, the results of the queries will be cached for faster repetitive access. - Examples - -------- - >>> import pandas as pd - >>> from nanocube import NanoCube - >>> # Create a DataFrame - >>> df = pd.DataFrame({'customer': [ 'A', 'B', 'A', 'B', 'A'], - >>> 'product': ['P1', 'P2', 'P3', 'P1', 'P2'], - >>> 'promo': [True, False, True, True, False], - >>> 'sales': [ 100, 200, 300, 400, 500], - >>> 'cost': [ 60, 90, 120, 200, 240]}) - >>> # Convert to a Cube - >>> c_roaring = NanoCube(df) - >>> print(c_roaring.get(customer='A', product='P1')) # [100, 60] - >>> print(c_roaring.get(customer='A')) # [900, 420] - >>> print(c_roaring.get(promo=True)) # [800, 380] - >>> print(c_roaring.get(promo=True)) # [800, 380] - """ - #self.schema = Schema(df=df, dimensions=dimensions, measures=measures) - - self._agg_func: dict = { - "sum": np.nansum, - "mean": np.nanmean, - "min": np.nanmin, - "max": np.nanmax, - "std": np.nanstd, - "var": np.nanvar, - "count": np.count_nonzero, - } - - measures = [c for c in df.columns if is_numeric_dtype(df[c].dtype) and not is_bool_dtype(df[c].dtype)] if measures is None else measures - self.measures:dict = dict([(col, i) for i, col in enumerate(measures)]) - - dimensions = [c for c in df.columns if not c in measures and not is_float_dtype(df[c].dtype)] if dimensions is None else dimensions - self.indexing_method: IndexingMethod = IndexingMethod.from_str(indexing_method) - self.index:NanoIndex = NanoIndex.create(df=df, dimensions=dimensions, indexing_method=self.indexing_method) - self.dimensions:dict = self.index._dimensions - - self.values: list = [df[c].to_numpy() for c in self.measures.keys()] # value vectors (references only) - self.caching:bool = caching - self.cache: dict = {"@":0} if caching else None - - compressor = "zstd" # zstd, lz4, snappy, zlib, blosc, brotli, lzf, lzma, zstd, snappy, bzip2, gif - if compressor == "lz4": - self._compress = lz4.frame.compress - self._decompress = lz4.frame.decompress - elif compressor == "zstd": - self._compress = zstd.ZstdCompressor(level=3).compress - self._decompress = zstd.ZstdDecompressor().decompress - - - def get(self, *args, aggregate: str | None = "sum", **kwargs): - """ - Get the aggregated values for the given dimensions and members. - - :param aggregate: the aggregation function to be used (sum, mean, min, max, std, var, count). - :param kwargs: the dimension column names as keyword and their requested members as argument. - :param args: (optional) some measure column names to be returned. - :return: - The aggregated values as: - - a dict of (measures, value) pairs for all defined measures. - - a scalar if only one measure as arg is given. - - a list of values for multiple measures if multiple args are given. - """ - - if self.caching: - key = f"{args}-{kwargs}" - if key in self.cache: - return self.cache[key] - - rows = self.index.get_rows(**kwargs) - agg_func = self._agg_func[aggregate] # if aggregate in self._agg_func else np.nansum - - if isinstance(rows, array | np.ndarray) and len(rows) > 0: - if len(args) == 0: # return all measures as dict - result = dict([(c, agg_func(self.values[i][rows]).item()) for c, i in self.measures.items()]) - elif len(args) == 1: # return one measure as scalar value - result = agg_func(self.values[self.measures[args[0]]][rows]).item() - else: # return list of measures - result = [agg_func(self.values[self.measures[a]][rows]) for a in args] - elif not rows: # no rows available for the given context - result = 0 - else: # rows == True -> return all rows - if len(args) == 0: - result = dict([(c, agg_func(self.values[i]).item()) for c, i in self.measures.items()]) - elif len(args) == 1: - result = agg_func(self.values[self.measures[args[0]]]).item() - else: - result = [agg_func(self.values[self.measures[a]]).item() for a in args] - - if self.caching: - self.cache[key] = result - return result - - @staticmethod - def load(file_name: str) -> 'NanoCube': - """ - Load a NanoCube from a file. - """ - # Read from Parquet - table = pq.read_table(file_name) - - # Deserialize metadata - bin_data = table[0].to_pylist() - meta = json.loads(bin_data[0]) - - method = meta["indexing_method"] - indexing_method = IndexingMethod.from_str(method) - nc = NanoCube(pd.DataFrame(), indexing_method=indexing_method) - - nc.index._dimensions = meta["dimensions"] - nc.dimensions = meta["dimensions"] - nc.measures = meta["measures"] - nc.values = meta["values"] - value_types = meta["value_types"] - nc.index._bitmaps = [dict([(m, i) for m, i in bm.items()]) for bm in meta["members"]] - - # Deserialize bitmaps - if indexing_method == IndexingMethod.roaring: - for d, bm_dict in enumerate(nc.index._bitmaps): - for m, i in bm_dict.items(): - nc.index._bitmaps[d][m] = BitMap.deserialize(nc._decompress(bin_data[i])) - elif indexing_method == IndexingMethod.numpy: - for d, bm_dict in enumerate(nc.index._bitmaps): - for m, i in bm_dict.items(): - data = BitMap.deserialize(nc._decompress(bin_data[i])).to_array() - nc.index._bitmaps[d][m] = np.array(data) - else: - raise ValueError(f"Unsupported indexing method {indexing_method}") - - # Deserialize values - for i, v in enumerate(nc.values): - value_type = value_types[i] - if value_type == 'float64': - type = np.float64 - elif value_type == 'int64': - type = np.int64 - else: - raise ValueError(f"Unsupported value type {value_type}") - nc.values[i] = np.frombuffer(nc._decompress(bin_data[v]), dtype=type) - - return nc - - - def save(self, file_name: str): - """ - Save the NanoCube to a file. - """ - # Create Arrow schema - schema = pa.schema([ - pa.field('data', pa.binary())], - metadata={"app": "NanoCube", "version": "0.1.2"}) - - # Serialize metadata - meta = {"rows": len(self.values[0]), "dimensions": self.dimensions, "measures": self.measures, - "members": [dict([(m, -1) for m in bm.keys()]) for bm in self.index._bitmaps], - "values": [-1 for _ in self.values], - "value_types": [f"{type(v[0]).__name__}" for v in self.values], - "indexing_method": str(self.indexing_method)} - bin_data = [None, ] - z = 1 - - # Serialize bitmaps - if self.indexing_method == IndexingMethod.roaring: - for i, bm_dict in enumerate(self.index._bitmaps): - for j, bm in enumerate(bm_dict.values()): - bin_data.append(self._compress(bm.serialize())) - meta["members"][i].update({m: z + j for j, m in enumerate(bm_dict.keys())}) - z += len(bm_dict) - elif self.indexing_method == IndexingMethod.numpy: - for i, bm_dict in enumerate(self.index._bitmaps): - for j, bm in enumerate(bm_dict.values()): - bin_data.append(self._compress(BitMap(bm).serialize())) - meta["members"][i].update({m: z + j for j, m in enumerate(bm_dict.keys())}) - z += len(bm_dict) - else: - raise ValueError(f"Unsupported indexing method {self.indexing_method}") - - # Serialize values - for i, v in enumerate(self.values): - bin_data.append(self._compress(v.tobytes())) - meta["values"][i] = z + i - - # Serialize metadata - bin_data[0] = json.dumps(meta).encode('utf-8') - - # write to disk - data = [pa.array(bin_data, type=pa.binary()), ] - pat = pa.Table.from_arrays(data, schema=schema) - pq.write_table(pat, file_name) diff --git a/nanocube_old/nano_index.py b/nanocube_old/nano_index.py deleted file mode 100644 index a3bc97b..0000000 --- a/nanocube_old/nano_index.py +++ /dev/null @@ -1,129 +0,0 @@ -# nanocube - Copyright (c)2024, Thomas Zeutschler, MIT license -from abc import abstractmethod -from array import array -from enum import Enum -from functools import reduce - -import sortednp as snp -import numpy as np -import pandas as pd -from pyroaring import BitMap - - -class IndexingMethod(Enum): - roaring = "roaring" - numpy = "numpy" - - @staticmethod - def from_str(label): - if isinstance(label, str): - return IndexingMethod.numpy if label.lower().strip() == "numpy" else IndexingMethod.roaring - return label - - def __str__(self): - return str(self.value) - - def __eq__(self, other): - if isinstance(other, str): - return self.value == other - return self.value == other.value - -class NanoIndex: - @abstractmethod - def get_rows(self, **kwargs) -> array | bool: - pass - - @property - @abstractmethod - def dimensions(self) -> dict: - pass - - @staticmethod - def create(df: pd.DataFrame, dimensions: 'list | None' = None, - indexing_method: IndexingMethod | str = IndexingMethod.roaring): - if isinstance(indexing_method, str): - indexing_method = IndexingMethod.from_str(indexing_method) - if indexing_method == IndexingMethod.roaring: - return NanoRoaringIndex(df, dimensions) - elif indexing_method == IndexingMethod.numpy: - return NanoNumpyIndex(df, dimensions) - - -class NanoRoaringIndex(NanoIndex): - """NanoCube index.""" - - def __init__(self, df: pd.DataFrame, dimensions: 'list | None' = None): - self._dimensions: dict = dict([(col, i) for i, col in enumerate(dimensions)]) - self._bitmaps: list = [] # bitmaps per dimension per member containing the row ids of the DataFrame - for dimension in self.dimensions.keys(): - member_bitmaps = {} - for row, member in enumerate(df[dimension].to_list()): - if member not in member_bitmaps: - member_bitmaps[member] = BitMap([row]) - else: - member_bitmaps[member].add(row) - self._bitmaps.append(member_bitmaps) - - @property - def dimensions(self) -> dict: - return self._dimensions - - def get_rows(self, **kwargs) -> array | bool: - if kwargs: - bitmaps = [(reduce(lambda x, y: x | y, [self._bitmaps[d][m] for m in kwargs[dim]]) - if (isinstance(kwargs[dim], list) or isinstance(kwargs[dim], tuple)) and not isinstance( - kwargs[dim], str) - else self._bitmaps[d][kwargs[dim]]) for d, dim in enumerate(self.dimensions.keys()) if - dim in kwargs] - if bitmaps: - bitmaps = sorted(bitmaps, key=lambda l: len(l)) - - bitmap = bitmaps[0].copy() - for i in range(1, len(bitmaps)): - bitmap &= bitmaps[i] - return bitmap.to_array() - - return reduce(lambda x, y: x & y, bitmaps).to_array() if bitmaps else False - else: - return False - else: - return True - - -class NanoNumpyIndex(NanoIndex): - """NanoCube index.""" - - def __init__(self, df: pd.DataFrame, dimensions: 'list | None' = None): - self._dimensions: dict = dict([(col, i) for i, col in enumerate(dimensions)]) - self._bitmaps: list = [] # bitmaps per dimension per member containing the row ids of the DataFrame - for col in self.dimensions.keys(): - member_bitmaps = {} - for row, member in enumerate(df[col].to_list()): - if member not in member_bitmaps: - member_bitmaps[member] = BitMap([row]) - else: - member_bitmaps[member].add(row) - - for v in member_bitmaps.keys(): - member_bitmaps[v] = np.array(member_bitmaps[v].to_array()) - self._bitmaps.append(member_bitmaps) - - @property - def dimensions(self) -> dict: - return self._dimensions - - def get_rows(self, **kwargs) -> array | bool: - if kwargs: - bitmaps = [(reduce(lambda x, y: snp.merge(x, y, duplicates=snp.MergeDuplicates.DROP), [self._bitmaps[d][m] for m in kwargs[dim]]) - if isinstance(kwargs[dim], list | tuple) and not isinstance(kwargs[dim], str) - else self._bitmaps[d][kwargs[dim]]) for d, dim in enumerate(self.dimensions.keys()) if dim in kwargs] - - if bitmaps: - bitmaps = sorted(bitmaps, key=lambda l: len(l)) - return reduce(lambda x, y: snp.intersect(x, y, duplicates=snp.IntersectDuplicates.DROP), - bitmaps) if bitmaps else False - # return snp.kway_intersect(bitmaps, assume_sorted=True) if bitmaps else False - else: - return False - else: - return True diff --git a/nanocube_old/schema.py b/nanocube_old/schema.py deleted file mode 100644 index 15bc876..0000000 --- a/nanocube_old/schema.py +++ /dev/null @@ -1,71 +0,0 @@ -# nanocube - Copyright (c)2024, Thomas Zeutschler, MIT license -from dataclasses import dataclass -from typing import Any - -import numpy as np - - -@dataclass -class SchemaDimension: - """Defines a Dimension of a NanoCube.""" - ordinal: int - column: str - dtype: np.dtype | None = None - description: str = None - - -@dataclass -class SchemaMeasure: - """Defines a Measure of a NanoCube.""" - ordinal: int - column: str - dtype: np.dtype | None = None - description: str | None = None - format: str | None = None - - -class Schema: - """ - Schema defining the dimensions and measures of a NanoCube. - """ - - def __init__(self, df: Any | None = None, dimensions: Any | None = None, measures: Any | None = None): - self.measures: list[SchemaMeasure] = [] - self._load_measures(df, measures) - self.dimensions: list[SchemaDimension] = [] - self._load_dimensions(df, dimensions) - - def _load_dimensions(self, df, dimensions): - - if isinstance(dimensions, str): - dimensions = SchemaDimension(ordinal=0, column=dimensions) - if isinstance(dimensions, SchemaDimension): - self.dimensions.append(dimensions) - return - if not isinstance(dimensions, list | tuple): - raise ValueError("Argument `dimensions` must be list or tuple of type " - "`SchemaDimension` or `str` representing a columns name.") - - for i, dimension in enumerate(dimensions): - if isinstance(dimension, str): - self.dimensions.append(SchemaDimension(ordinal=i, column=dimension)) - else: - self.dimensions.append(dimension) - - def _load_measures(self, df, measures): - if measures is None: - return - if isinstance(measures, str): - measures = SchemaMeasure(ordinal=0, column=measures) - if isinstance(measures, SchemaMeasure): - self.measures.append(measures) - return - if not isinstance(measures, list | tuple): - raise ValueError("Argument `measures` must be list or tuple of type " - "`SchemaMeasure` or `str` representing a columns name.") - - for i, measure in enumerate(measures): - if isinstance(measure, str): - self.measures.append(SchemaDimension(ordinal=i, column=measure)) - else: - self.measures.append(measure)